diff --git a/core/scheduler/src/main/scala/org/apache/openwhisk/core/scheduler/SchedulerServer.scala b/core/scheduler/src/main/scala/org/apache/openwhisk/core/scheduler/FPCSchedulerServer.scala similarity index 64% rename from core/scheduler/src/main/scala/org/apache/openwhisk/core/scheduler/SchedulerServer.scala rename to core/scheduler/src/main/scala/org/apache/openwhisk/core/scheduler/FPCSchedulerServer.scala index 841b1390155..874362fe97a 100644 --- a/core/scheduler/src/main/scala/org/apache/openwhisk/core/scheduler/SchedulerServer.scala +++ b/core/scheduler/src/main/scala/org/apache/openwhisk/core/scheduler/FPCSchedulerServer.scala @@ -18,12 +18,14 @@ package org.apache.openwhisk.core.scheduler import akka.actor.ActorSystem +import akka.http.scaladsl.marshallers.sprayjson.SprayJsonSupport._ import akka.http.scaladsl.model.StatusCodes import akka.http.scaladsl.model.headers.BasicHttpCredentials import akka.http.scaladsl.server.Route import org.apache.openwhisk.common.{Logging, TransactionId} import org.apache.openwhisk.http.BasicRasService import org.apache.openwhisk.http.ErrorResponse.terminate +import spray.json.DefaultJsonProtocol._ import spray.json._ import scala.concurrent.ExecutionContext @@ -32,7 +34,7 @@ import scala.concurrent.ExecutionContext * Implements web server to handle certain REST API calls. * Currently provides a health ping route, only. */ -class SchedulerServer(scheduler: SchedulerCore, systemUsername: String, systemPassword: String)( +class FPCSchedulerServer(scheduler: SchedulerCore, systemUsername: String, systemPassword: String)( implicit val ec: ExecutionContext, implicit val actorSystem: ActorSystem, implicit val logger: Logging) @@ -41,10 +43,28 @@ class SchedulerServer(scheduler: SchedulerCore, systemUsername: String, systemPa override def routes(implicit transid: TransactionId): Route = { super.routes ~ extractCredentials { case Some(BasicHttpCredentials(username, password)) if username == systemUsername && password == systemPassword => - (path("disable") & post) { + (path("state") & get) { + complete { + scheduler.getState.map { + case (list, creationCount) => + (list + .map(scheduler => scheduler._1.asString -> scheduler._2.toString) + .toMap + ++ Map("creationCount" -> creationCount.toString)).toJson.asJsObject + } + } + } ~ (path("disable") & post) { logger.warn(this, "Scheduler is disabled") scheduler.disable() complete("scheduler disabled") + } ~ (path(FPCSchedulerServer.queuePathPrefix / "total") & get) { + complete { + scheduler.getQueueSize.map(_.toString) + } + } ~ (path(FPCSchedulerServer.queuePathPrefix / "status") & get) { + complete { + scheduler.getQueueStatusData.map(s => s.toJson) + } } case _ => implicit val jsonPrettyResponsePrinter = PrettyPrinter @@ -53,21 +73,16 @@ class SchedulerServer(scheduler: SchedulerCore, systemUsername: String, systemPa } } -object SchedulerServer { +object FPCSchedulerServer { - val schedulerUsername = { - val source = scala.io.Source.fromFile("/conf/schedulerauth.username") - try source.mkString.replaceAll("\r|\n", "") - finally source.close() - } - val schedulerPassword = { - val source = scala.io.Source.fromFile("/conf/schedulerauth.password") - try source.mkString.replaceAll("\r|\n", "") - finally source.close() - } + // TODO: TBD, after FPCScheduler is ready, can read the credentials from pureconfig + val schedulerUsername = "admin" + val schedulerPassword = "admin" + + val queuePathPrefix = "queue" def instance(scheduler: SchedulerCore)(implicit ec: ExecutionContext, actorSystem: ActorSystem, logger: Logging): BasicRasService = - new SchedulerServer(scheduler, schedulerUsername, schedulerPassword) + new FPCSchedulerServer(scheduler, schedulerUsername, schedulerPassword) } diff --git a/core/scheduler/src/main/scala/org/apache/openwhisk/core/scheduler/Scheduler.scala b/core/scheduler/src/main/scala/org/apache/openwhisk/core/scheduler/Scheduler.scala index 9fc793bed1f..6bb4311953a 100644 --- a/core/scheduler/src/main/scala/org/apache/openwhisk/core/scheduler/Scheduler.scala +++ b/core/scheduler/src/main/scala/org/apache/openwhisk/core/scheduler/Scheduler.scala @@ -81,21 +81,20 @@ class Scheduler(schedulerId: SchedulerInstanceId, schedulerEndpoints: SchedulerE val durationChecker = "" // TODO: TBD override def getState: Future[(List[(SchedulerInstanceId, Int)], Int)] = { - Future.successful((List((schedulerId, 0)), 0)) // TODO: TBD + Future.successful((List((schedulerId, 0)), 0)) // TODO: TBD, after etcdClient is ready, can implement it } override def getQueueSize: Future[Int] = { - Future.successful(0) // TODO: TBD + Future.successful(0) // TODO: TBD, after queueManager is ready, can implement it } - override def getQueueStatusData: Future[List[String]] = { - Future.successful(List("")) // TODO: TBD + override def getQueueStatusData: Future[List[StatusData]] = { + Future.successful(List(StatusData("ns", "fqn", 0, "Running", "data"))) // TODO: TBD, after queueManager is ready, can implement it } - // other components don't need to shutdown gracefully override def disable(): Unit = { logging.info(this, s"Gracefully shutting down the scheduler") - // TODO: TBD, gracefully shut down the container manager and queue manager + // TODO: TBD, after containerManager and queueManager are ready, can implement it } private def getUserLimit(invocationNamespace: String): Future[Int] = { @@ -160,7 +159,7 @@ trait SchedulerCore { def getQueueSize: Future[Int] - def getQueueStatusData: Future[List[String]] // TODO: Change to the real data class other than just string + def getQueueStatusData: Future[List[StatusData]] def disable(): Unit } @@ -253,7 +252,7 @@ object Scheduler { val httpsConfig = if (Scheduler.protocol == "https") Some(loadConfigOrThrow[HttpsConfig]("whisk.controller.https")) else None - BasicHttpService.startHttpService(SchedulerServer.instance(scheduler).route, port, httpsConfig)( + BasicHttpService.startHttpService(FPCSchedulerServer.instance(scheduler).route, port, httpsConfig)( actorSystem, ActorMaterializer.create(actorSystem)) diff --git a/tests/build.gradle b/tests/build.gradle index 57c3bd733c1..49fac127831 100644 --- a/tests/build.gradle +++ b/tests/build.gradle @@ -325,6 +325,7 @@ task copyMeasurementFiles() { doLast{ Project common = project(":common:scala") Project controller = project(":core:controller") + Project scheduler = project(":core:scheduler") Project invoker = project(":core:invoker") Properties wskProps = loadWhiskProps() @@ -335,6 +336,8 @@ task copyMeasurementFiles() { copyAndRenameMeasurementFile(covLogs, 'controller', "common", common) copyAndRenameMeasurementFile(covLogs, 'controller', "controller", controller) + copyAndRenameMeasurementFile(covLogs, 'scheduler', "common", common) + copyAndRenameMeasurementFile(covLogs, 'scheduler', "scheduler", scheduler) copyAndRenameMeasurementFile(covLogs, 'invoker', "common", common) copyAndRenameMeasurementFile(covLogs, 'invoker', "invoker", invoker) } diff --git a/tests/src/test/scala/org/apache/openwhisk/core/scheduler/FPCSchedulerServerTests.scala b/tests/src/test/scala/org/apache/openwhisk/core/scheduler/FPCSchedulerServerTests.scala new file mode 100644 index 00000000000..0dab4f4ce91 --- /dev/null +++ b/tests/src/test/scala/org/apache/openwhisk/core/scheduler/FPCSchedulerServerTests.scala @@ -0,0 +1,153 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.openwhisk.core.scheduler + +import akka.http.scaladsl.marshallers.sprayjson.SprayJsonSupport._ +import akka.http.scaladsl.model.StatusCodes._ +import akka.http.scaladsl.model.headers.BasicHttpCredentials +import akka.http.scaladsl.server.Route +import akka.http.scaladsl.testkit.ScalatestRouteTest +import common.StreamLogging +import org.apache.openwhisk.common.TransactionId +import org.apache.openwhisk.core.connector.StatusData +import org.apache.openwhisk.core.entity.SchedulerInstanceId +import org.junit.runner.RunWith +import org.scalamock.scalatest.MockFactory +import org.scalatest.junit.JUnitRunner +import org.scalatest.{BeforeAndAfterAll, BeforeAndAfterEach, FlatSpec, Matchers} +import spray.json.DefaultJsonProtocol._ +import spray.json._ + +import scala.concurrent.Future + +/** + * Tests SchedulerServer API. + */ +@RunWith(classOf[JUnitRunner]) +class FPCSchedulerServerTests + extends FlatSpec + with BeforeAndAfterEach + with BeforeAndAfterAll + with ScalatestRouteTest + with Matchers + with StreamLogging + with MockFactory { + + def transid() = TransactionId("tid") + + val systemUsername = "username" + val systemPassword = "password" + + val queues = List((SchedulerInstanceId("0"), 2), (SchedulerInstanceId("1"), 3)) + val creationCount = 1 + val testQueueSize = 2 + val statusDatas = List( + StatusData("testns1", "testaction1", 10, "Running", "RunningData"), + StatusData("testns2", "testaction2", 5, "Running", "RunningData")) + + // Create scheduler + val scheduler = new TestScheduler(queues, creationCount, testQueueSize, statusDatas) + val server = new FPCSchedulerServer(scheduler, systemUsername, systemPassword) + + override protected def afterEach(): Unit = scheduler.reset() + + /** FPCSchedulerServer API tests */ + behavior of "FPCSchedulerServer API" + + // POST /disable + it should "disable scheduler" in { + implicit val tid = transid() + val validCredentials = BasicHttpCredentials(systemUsername, systemPassword) + Post(s"/disable") ~> addCredentials(validCredentials) ~> Route.seal(server.routes(tid)) ~> check { + status should be(OK) + scheduler.shutdownCount shouldBe 1 + } + } + + // GET /state + it should "get scheduler state" in { + implicit val tid = transid() + val validCredentials = BasicHttpCredentials(systemUsername, systemPassword) + Get(s"/state") ~> addCredentials(validCredentials) ~> Route.seal(server.routes(tid)) ~> check { + status should be(OK) + responseAs[JsObject] shouldBe (queues.map(s => s._1.asString -> s._2.toString).toMap ++ Map( + "creationCount" -> creationCount.toString)).toJson + } + } + + // GET /queue/total + it should "get total queue" in { + implicit val tid = transid() + val validCredentials = BasicHttpCredentials(systemUsername, systemPassword) + Get(s"/queue/total") ~> addCredentials(validCredentials) ~> Route.seal(server.routes(tid)) ~> check { + status should be(OK) + responseAs[String] shouldBe testQueueSize.toString + } + } + + // GET /queue/status + it should "get all queue status" in { + implicit val tid = transid() + val validCredentials = BasicHttpCredentials(systemUsername, systemPassword) + Get(s"/queue/status") ~> addCredentials(validCredentials) ~> Route.seal(server.routes(tid)) ~> check { + status should be(OK) + responseAs[List[JsObject]] shouldBe statusDatas.map(_.toJson) + } + } + + // POST /disable with invalid credential + it should "not call scheduler api with invalid credential" in { + implicit val tid = transid() + val invalidCredentials = BasicHttpCredentials("invaliduser", "invalidpass") + Post(s"/disable") ~> addCredentials(invalidCredentials) ~> Route.seal(server.routes(tid)) ~> check { + status should be(Unauthorized) + scheduler.shutdownCount shouldBe 0 + } + } + + // POST /disable with empty credential + it should "not call scheduler api with empty credential" in { + implicit val tid = transid() + Post(s"/disable") ~> Route.seal(server.routes(tid)) ~> check { + status should be(Unauthorized) + scheduler.shutdownCount shouldBe 0 + } + } + +} + +class TestScheduler(schedulerStates: List[(SchedulerInstanceId, Int)], + creationCount: Int, + queueSize: Int, + statusDatas: List[StatusData]) + extends SchedulerCore { + var shutdownCount = 0 + + override def getState: Future[(List[(SchedulerInstanceId, Int)], Int)] = + Future.successful(schedulerStates, creationCount) + + override def getQueueSize: Future[Int] = Future.successful(queueSize) + + override def getQueueStatusData: Future[List[StatusData]] = Future.successful(statusDatas) + + override def disable(): Unit = shutdownCount += 1 + + def reset(): Unit = { + shutdownCount = 0 + } +}