diff --git a/common/scala/src/main/scala/org/apache/openwhisk/common/TransactionId.scala b/common/scala/src/main/scala/org/apache/openwhisk/common/TransactionId.scala index 257dd07c703..e8e58b67c69 100644 --- a/common/scala/src/main/scala/org/apache/openwhisk/common/TransactionId.scala +++ b/common/scala/src/main/scala/org/apache/openwhisk/common/TransactionId.scala @@ -232,6 +232,7 @@ object TransactionId { val systemPrefix = "sid_" + var containerCreation = TransactionId(systemPrefix + "containerCreation") val unknown = TransactionId(systemPrefix + "unknown") val testing = TransactionId(systemPrefix + "testing") // Common id for for unit testing val invoker = TransactionId(systemPrefix + "invoker") // Invoker startup/shutdown or GC activity diff --git a/common/scala/src/main/scala/org/apache/openwhisk/core/WhiskConfig.scala b/common/scala/src/main/scala/org/apache/openwhisk/core/WhiskConfig.scala index a050ce74609..7c0ec2b3998 100644 --- a/common/scala/src/main/scala/org/apache/openwhisk/core/WhiskConfig.scala +++ b/common/scala/src/main/scala/org/apache/openwhisk/core/WhiskConfig.scala @@ -85,6 +85,10 @@ class WhiskConfig(requiredProperties: Map[String, String], val triggerFirePerMinuteLimit = this(WhiskConfig.triggerFirePerMinuteLimit) val actionSequenceLimit = this(WhiskConfig.actionSequenceMaxLimit) val controllerSeedNodes = this(WhiskConfig.controllerSeedNodes) + + val schedulerHost = this(WhiskConfig.schedulerHost) + val schedulerRpcPort = this(WhiskConfig.schedulerRpcPort) + val schedulerAkkaPort = this(WhiskConfig.schedulerAkkaPort) } object WhiskConfig { @@ -190,6 +194,10 @@ object WhiskConfig { val actionInvokeConcurrentLimit = "limits.actions.invokes.concurrent" val triggerFirePerMinuteLimit = "limits.triggers.fires.perMinute" val controllerSeedNodes = "akka.cluster.seed.nodes" + + val schedulerHost = "whisk.scheduler.endpoints.host" + val schedulerRpcPort = "whisk.scheduler.endpoints.rpcPort" + val schedulerAkkaPort = "whisk.scheduler.endpoints.akkaPort" } object ConfigKeys { diff --git a/common/scala/src/main/scala/org/apache/openwhisk/core/connector/Message.scala b/common/scala/src/main/scala/org/apache/openwhisk/core/connector/Message.scala index 5ca2a71be8c..22e7a60d12c 100644 --- a/common/scala/src/main/scala/org/apache/openwhisk/core/connector/Message.scala +++ b/common/scala/src/main/scala/org/apache/openwhisk/core/connector/Message.scala @@ -426,3 +426,33 @@ object EventMessage extends DefaultJsonProtocol { def parse(msg: String) = Try(format.read(msg.parseJson)) } + +/** + * This case class is used when retrieving the snapshot of the queue status from the scheduler at a certain moment. + * This is useful to figure out the internal status when any issue happens. + * The following would be an example result. + * + * [ + * ... + * { + * "data": "RunningData", + * "fqn": "whisk.system/elasticsearch/status-alarm@0.0.2", + * "invocationNamespace": "style95", + * "status": "Running", + * "waitingActivation": 1 + * }, + * ... + * ] + */ +object StatusQuery +case class StatusData(invocationNamespace: String, fqn: String, waitingActivation: Int, status: String, data: String) + extends Message { + + override def serialize: String = StatusData.serdes.write(this).compactPrint + +} +object StatusData extends DefaultJsonProtocol { + + implicit val serdes = + jsonFormat(StatusData.apply _, "invocationNamespace", "fqn", "waitingActivation", "status", "data") +} diff --git a/common/scala/src/main/scala/org/apache/openwhisk/core/entity/InstanceId.scala b/common/scala/src/main/scala/org/apache/openwhisk/core/entity/InstanceId.scala index e80720fba71..0421e9b72d0 100644 --- a/common/scala/src/main/scala/org/apache/openwhisk/core/entity/InstanceId.scala +++ b/common/scala/src/main/scala/org/apache/openwhisk/core/entity/InstanceId.scala @@ -57,6 +57,17 @@ case class ControllerInstanceId(asString: String) extends InstanceId { override val toJson: JsValue = ControllerInstanceId.serdes.write(this) } +case class SchedulerInstanceId(val asString: String) extends InstanceId { + validate(asString) + override val instanceType = "scheduler" + + override val source = s"$instanceType$asString" + + override val toString: String = source + + override val toJson: JsValue = SchedulerInstanceId.serdes.write(this) +} + object InvokerInstanceId extends DefaultJsonProtocol { def parse(c: String): Try[InvokerInstanceId] = Try(serdes.read(c.parseJson)) @@ -112,6 +123,10 @@ object ControllerInstanceId extends DefaultJsonProtocol { } } +object SchedulerInstanceId extends DefaultJsonProtocol { + implicit val serdes = jsonFormat(SchedulerInstanceId.apply _, "asString") +} + trait InstanceId { // controller ids become part of a kafka topic, hence, hence allow only certain characters diff --git a/core/scheduler/Dockerfile b/core/scheduler/Dockerfile new file mode 100644 index 00000000000..244d80d7b05 --- /dev/null +++ b/core/scheduler/Dockerfile @@ -0,0 +1,33 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +FROM scala + +ENV UID=1001 \ + NOT_ROOT_USER=owuser + +# Copy app jars +ADD build/distributions/scheduler.tar / + +COPY init.sh / +RUN chmod +x init.sh + +RUN adduser -D -u ${UID} -h /home/${NOT_ROOT_USER} -s /bin/bash ${NOT_ROOT_USER} +USER ${NOT_ROOT_USER} + +EXPOSE 8080 +CMD ["./init.sh", "0"] diff --git a/core/scheduler/build.gradle b/core/scheduler/build.gradle new file mode 100644 index 00000000000..530c9624fea --- /dev/null +++ b/core/scheduler/build.gradle @@ -0,0 +1,58 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +apply plugin: 'scala' +apply plugin: 'application' +apply plugin: 'eclipse' +apply plugin: 'maven' +apply plugin: 'org.scoverage' + +ext.dockerImageName = 'scheduler' +apply from: '../../gradle/docker.gradle' +distDocker.dependsOn ':common:scala:distDocker', 'distTar' + +project.archivesBaseName = "openwhisk-scheduler" + +ext.coverageDirs = [ + "${buildDir}/classes/scala/scoverage", + "${project(':common:scala').buildDir.absolutePath}/classes/scala/scoverage" +] +distDockerCoverage.dependsOn ':common:scala:scoverageClasses', 'scoverageClasses' + +// Define a separate configuration for managing the dependency on Jetty ALPN agent. +configurations { + alpnagent +} + +dependencies { + configurations.all { + resolutionStrategy.force "com.lihaoyi:fastparse_${gradle.scala.depVersion}:2.1.3" + resolutionStrategy.force "com.typesafe.akka:akka-http-core_${gradle.scala.depVersion}:${gradle.akka_http.version}" + resolutionStrategy.force "com.typesafe.akka:akka-http_${gradle.scala.depVersion}:${gradle.akka_http.version}" + resolutionStrategy.force "com.typesafe.akka:akka-http2-support_${gradle.scala.depVersion}:${gradle.akka_http.version}" + resolutionStrategy.force "com.typesafe.akka:akka-http-spray-json_${gradle.scala.depVersion}:${gradle.akka_http.version}" + resolutionStrategy.force "com.typesafe.akka:akka-parsing_${gradle.scala.depVersion}:${gradle.akka_http.version}" + resolutionStrategy.force "com.typesafe.akka:akka-http_${gradle.scala.depVersion}:${gradle.akka_http.version}" + } + + compile "org.scala-lang:scala-library:${gradle.scala.version}" + compile project(':common:scala') + +} + +mainClassName = "org.apache.openwhisk.core.scheduler.Scheduler" +applicationDefaultJvmArgs = ["-Djava.security.egd=file:/dev/./urandom"] diff --git a/core/scheduler/init.sh b/core/scheduler/init.sh new file mode 100644 index 00000000000..8d359d5780c --- /dev/null +++ b/core/scheduler/init.sh @@ -0,0 +1,24 @@ +#!/bin/bash +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +./copyJMXFiles.sh + +export SCHEDULER_OPTS +SCHEDULER_OPTS="$SCHEDULER_OPTS -Dakka.remote.netty.tcp.bind-hostname=$(hostname -i) $(./transformEnvironment.sh)" + +exec scheduler/bin/scheduler "$@" diff --git a/core/scheduler/src/main/scala/org/apache/openwhisk/core/scheduler/Scheduler.scala b/core/scheduler/src/main/scala/org/apache/openwhisk/core/scheduler/Scheduler.scala new file mode 100644 index 00000000000..9fc793bed1f --- /dev/null +++ b/core/scheduler/src/main/scala/org/apache/openwhisk/core/scheduler/Scheduler.scala @@ -0,0 +1,305 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.openwhisk.core.scheduler + +import akka.Done +import akka.actor.{ActorRefFactory, ActorSelection, ActorSystem, CoordinatedShutdown} +import akka.stream.ActorMaterializer +import akka.util.Timeout +import com.typesafe.config.ConfigValueFactory +import kamon.Kamon +import org.apache.openwhisk.common.Https.HttpsConfig +import org.apache.openwhisk.common._ +import org.apache.openwhisk.core.WhiskConfig +import org.apache.openwhisk.core.WhiskConfig.{servicePort, _} +import org.apache.openwhisk.core.ack.{MessagingActiveAck, UserEventSender} +import org.apache.openwhisk.core.connector._ +import org.apache.openwhisk.core.database.{ActivationStoreProvider, NoDocumentException, UserContext} +import org.apache.openwhisk.core.entity._ +import org.apache.openwhisk.http.BasicHttpService +import org.apache.openwhisk.spi.SpiLoader +import org.apache.openwhisk.utils.ExecutionContextFactory +import pureconfig.loadConfigOrThrow +import spray.json.{DefaultJsonProtocol, _} + +import scala.concurrent.duration._ +import scala.concurrent.{Await, Future} +import scala.language.postfixOps +import scala.util.{Failure, Success, Try} +import pureconfig.generic.auto._ + +class Scheduler(schedulerId: SchedulerInstanceId, schedulerEndpoints: SchedulerEndpoints)( + implicit config: WhiskConfig, + actorSystem: ActorSystem, + materializer: ActorMaterializer, + logging: Logging) + extends SchedulerCore { + implicit val ec = actorSystem.dispatcher + private val authStore = WhiskAuthStore.datastore() + + val msgProvider = SpiLoader.get[MessagingProvider] + val producer = msgProvider.getProducer(config, Some(ActivationEntityLimit.MAX_ACTIVATION_LIMIT)) + + val maxPeek = "" // TODO: TBD + val etcdClient = "" // TODO: TBD + val watcherService = "" // TODO: TBD + val leaseService = "" // TODO: TBD + + implicit val entityStore = WhiskEntityStore.datastore() + private val activationStore = + SpiLoader.get[ActivationStoreProvider].instance(actorSystem, materializer, logging) + + private val ack = { + val sender = if (UserEvents.enabled) Some(new UserEventSender(producer)) else None + new MessagingActiveAck(producer, schedulerId, sender) + } + + /** Stores an activation in the database. */ + private val store = (tid: TransactionId, activation: WhiskActivation, context: UserContext) => { + implicit val transid: TransactionId = tid + activationStore.store(activation, context)(tid, notifier = None).andThen { + case Success(doc) => logging.info(this, s"save ${doc} successfully") + case Failure(t) => logging.error(this, s"failed to save activation $activation, error: ${t.getMessage}") + } + } + val durationCheckerProvider = "" // TODO: TBD + val durationChecker = "" // TODO: TBD + + override def getState: Future[(List[(SchedulerInstanceId, Int)], Int)] = { + Future.successful((List((schedulerId, 0)), 0)) // TODO: TBD + } + + override def getQueueSize: Future[Int] = { + Future.successful(0) // TODO: TBD + } + + override def getQueueStatusData: Future[List[String]] = { + Future.successful(List("")) // TODO: TBD + } + + // other components don't need to shutdown gracefully + override def disable(): Unit = { + logging.info(this, s"Gracefully shutting down the scheduler") + // TODO: TBD, gracefully shut down the container manager and queue manager + } + + private def getUserLimit(invocationNamespace: String): Future[Int] = { + Identity + .get(authStore, EntityName(invocationNamespace))(trasnid) + .map { identity => + val limit = identity.limits.concurrentInvocations.getOrElse(config.actionInvokeConcurrentLimit.toInt) + logging.debug(this, s"limit for ${invocationNamespace}: ${limit}")(trasnid) + limit + } + .andThen { + case Failure(_: NoDocumentException) => + logging.warn(this, s"namespace does not exist: $invocationNamespace")(trasnid) + case Failure(_: IllegalStateException) => + logging.warn(this, s"namespace is not unique: $invocationNamespace")(trasnid) + } + } + + private val etcdWorkerFactory = "" // TODO: TBD + + /** + * This component is in charge of storing data to ETCD. + * Even if any error happens we can assume the data will be eventually available in the ETCD by this component. + */ + val dataManagementService = "" // TODO: TBD + + val creationJobManagerFactory = "" // TODO: TBD + + /** + * This component is responsible for creating containers for a given action. + * It relies on the creationJobManager to manage the container creation job. + */ + val containerManager = "" // TODO: TBD + + /** + * This is a factory to create memory queues. + * In the new architecture, each action is given its own dedicated queue. + */ + val memoryQueueFactory = "" // TODO: TBD + + val schedulerConsumer = msgProvider.getConsumer( + config, + s"scheduler${schedulerId.asString}", + s"scheduler${schedulerId.asString}", + 500, // TODO: to be updated with maxPeek variable + maxPollInterval = TimeLimit.MAX_DURATION + 1.minute) + + implicit val trasnid = TransactionId.containerCreation + + /** + * This is one of the major components which take charge of managing queues and coordinating requests among the scheduler, controllers, and invokers. + */ + val queueManager = "" // TODO: TBD + + //val serviceHandlers: HttpRequest => Future[HttpResponse] = ActivationServiceHandler.apply(ActivationServiceImpl()) TODO: TBD +} + +case class CmdLineArgs(uniqueName: Option[String] = None, id: Option[Int] = None, displayedName: Option[String] = None) + +trait SchedulerCore { + def getState: Future[(List[(SchedulerInstanceId, Int)], Int)] + + def getQueueSize: Future[Int] + + def getQueueStatusData: Future[List[String]] // TODO: Change to the real data class other than just string + + def disable(): Unit +} + +object Scheduler { + + protected val protocol = loadConfigOrThrow[String]("whisk.scheduler.protocol") + + /** + * The scheduler has two ports, one for akka-remote and the other for akka-grpc. + */ + def requiredProperties = + Map( + servicePort -> 8080.toString, + schedulerHost -> null, + schedulerAkkaPort -> null, + schedulerRpcPort -> null, + WhiskConfig.actionInvokePerMinuteLimit -> null, + WhiskConfig.actionInvokeConcurrentLimit -> null, + WhiskConfig.triggerFirePerMinuteLimit -> null) ++ + kafkaHosts ++ + zookeeperHosts ++ + wskApiHost ++ + ExecManifest.requiredProperties + + def initKamon(instance: SchedulerInstanceId): Unit = { + // Replace the hostname of the scheduler to the assigned id of the scheduler. + val newKamonConfig = Kamon.config + .withValue("kamon.environment.host", ConfigValueFactory.fromAnyRef(s"scheduler${instance.asString}")) + Kamon.init(newKamonConfig) + } + + def main(args: Array[String]): Unit = { + implicit val ec = ExecutionContextFactory.makeCachedThreadPoolExecutionContext() + implicit val actorSystem: ActorSystem = + ActorSystem(name = "scheduler-actor-system", defaultExecutionContext = Some(ec)) + implicit val materializer = ActorMaterializer.create(actorSystem) + + implicit val logger = new AkkaLogging(akka.event.Logging.getLogger(actorSystem, this)) + + // Prepare Kamon shutdown + CoordinatedShutdown(actorSystem).addTask(CoordinatedShutdown.PhaseActorSystemTerminate, "shutdownKamon") { () => + logger.info(this, s"Shutting down Kamon with coordinated shutdown") + Kamon.stopModules().map(_ => Done) + } + + def abort(message: String) = { + logger.error(this, message) + actorSystem.terminate() + Await.result(actorSystem.whenTerminated, 30.seconds) + sys.exit(1) + } + + // extract configuration data from the environment + implicit val config = new WhiskConfig(requiredProperties) + if (!config.isValid) { + abort("Bad configuration, cannot start.") + } + + val port = config.servicePort.toInt + val host = config.schedulerHost + val rpcPort = config.schedulerRpcPort.toInt + val akkaPort = config.schedulerAkkaPort.toInt + + // if deploying multiple instances (scale out), must pass the instance number as they need to be uniquely identified. + require(args.length >= 1, "scheduler instance required") + val instanceId = SchedulerInstanceId(args(0)) + + initKamon(instanceId) + + val msgProvider = SpiLoader.get[MessagingProvider] + + Seq( + ("scheduler" + instanceId.asString, "actions", Some(ActivationEntityLimit.MAX_ACTIVATION_LIMIT)), + ("creationAck" + instanceId.asString, "creationAck", Some(ActivationEntityLimit.MAX_ACTIVATION_LIMIT))) + .foreach { + case (topic, topicConfigurationKey, maxMessageBytes) => + if (msgProvider.ensureTopic(config, topic, topicConfigurationKey, maxMessageBytes).isFailure) { + abort(s"failure during msgProvider.ensureTopic for topic $topic") + } + } + + ExecManifest.initialize(config) match { + case Success(_) => + val schedulerEndpoints = SchedulerEndpoints(host, rpcPort, akkaPort) + // Create scheduler + val scheduler = new Scheduler(instanceId, schedulerEndpoints) + + // TODO: Add Akka-grpc handler + val httpsConfig = + if (Scheduler.protocol == "https") Some(loadConfigOrThrow[HttpsConfig]("whisk.controller.https")) else None + + BasicHttpService.startHttpService(SchedulerServer.instance(scheduler).route, port, httpsConfig)( + actorSystem, + ActorMaterializer.create(actorSystem)) + + case Failure(t) => + abort(s"Invalid runtimes manifest: $t") + } + } +} +case class SchedulerEndpoints(host: String, rpcPort: Int, akkaPort: Int) { + require(rpcPort != 0 || akkaPort != 0) + def asRpcEndpoint: String = s"$host:$rpcPort" + def asAkkaEndpoint: String = s"$host:$akkaPort" + + def getRemoteRef(name: String)(implicit context: ActorRefFactory): ActorSelection = { + implicit val ec = context.dispatcher + + val path = s"akka.tcp://scheduler-actor-system@${asAkkaEndpoint}/user/${name}" + context.actorSelection(path) + } + + def serialize = SchedulerEndpoints.serdes.write(this).compactPrint +} + +object SchedulerEndpoints extends DefaultJsonProtocol { + implicit val serdes = jsonFormat(SchedulerEndpoints.apply, "host", "rpcPort", "akkaPort") + def parse(endpoints: String) = Try(serdes.read(endpoints.parseJson)) +} + +case class SchedulerStates(sid: SchedulerInstanceId, queueSize: Int, endpoints: SchedulerEndpoints) { + private implicit val askTimeout = Timeout(5 seconds) + + def getRemoteRef(name: String)(implicit context: ActorRefFactory): ActorSelection = { + implicit val ec = context.dispatcher + + val path = s"akka.tcp://scheduler-actor-system@${endpoints.asAkkaEndpoint}/user/${name}" + context.actorSelection(path) + } + + def getSchedulerId(): SchedulerInstanceId = sid + + def serialize = SchedulerStates.serdes.write(this).compactPrint +} + +object SchedulerStates extends DefaultJsonProtocol { + private implicit val endpointsSerde = SchedulerEndpoints.serdes + implicit val serdes = jsonFormat(SchedulerStates.apply, "sid", "queueSize", "endpoints") + + def parse(states: String) = Try(serdes.read(states.parseJson)) +} diff --git a/core/scheduler/src/main/scala/org/apache/openwhisk/core/scheduler/SchedulerServer.scala b/core/scheduler/src/main/scala/org/apache/openwhisk/core/scheduler/SchedulerServer.scala new file mode 100644 index 00000000000..841b1390155 --- /dev/null +++ b/core/scheduler/src/main/scala/org/apache/openwhisk/core/scheduler/SchedulerServer.scala @@ -0,0 +1,73 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.openwhisk.core.scheduler + +import akka.actor.ActorSystem +import akka.http.scaladsl.model.StatusCodes +import akka.http.scaladsl.model.headers.BasicHttpCredentials +import akka.http.scaladsl.server.Route +import org.apache.openwhisk.common.{Logging, TransactionId} +import org.apache.openwhisk.http.BasicRasService +import org.apache.openwhisk.http.ErrorResponse.terminate +import spray.json._ + +import scala.concurrent.ExecutionContext + +/** + * Implements web server to handle certain REST API calls. + * Currently provides a health ping route, only. + */ +class SchedulerServer(scheduler: SchedulerCore, systemUsername: String, systemPassword: String)( + implicit val ec: ExecutionContext, + implicit val actorSystem: ActorSystem, + implicit val logger: Logging) + extends BasicRasService { + + override def routes(implicit transid: TransactionId): Route = { + super.routes ~ extractCredentials { + case Some(BasicHttpCredentials(username, password)) if username == systemUsername && password == systemPassword => + (path("disable") & post) { + logger.warn(this, "Scheduler is disabled") + scheduler.disable() + complete("scheduler disabled") + } + case _ => + implicit val jsonPrettyResponsePrinter = PrettyPrinter + terminate(StatusCodes.Unauthorized) + } + } +} + +object SchedulerServer { + + val schedulerUsername = { + val source = scala.io.Source.fromFile("/conf/schedulerauth.username") + try source.mkString.replaceAll("\r|\n", "") + finally source.close() + } + val schedulerPassword = { + val source = scala.io.Source.fromFile("/conf/schedulerauth.password") + try source.mkString.replaceAll("\r|\n", "") + finally source.close() + } + + def instance(scheduler: SchedulerCore)(implicit ec: ExecutionContext, + actorSystem: ActorSystem, + logger: Logging): BasicRasService = + new SchedulerServer(scheduler, schedulerUsername, schedulerPassword) +} diff --git a/settings.gradle b/settings.gradle index df0ee979a86..4792aa9d518 100644 --- a/settings.gradle +++ b/settings.gradle @@ -18,6 +18,7 @@ include 'common:scala' include 'core:controller' +include 'core:scheduler' include 'core:invoker' include 'core:cosmosdb:cache-invalidator' include 'core:standalone'