From e036382b395c6489a254e686485272b520d4cd23 Mon Sep 17 00:00:00 2001 From: Brendan Doyle Date: Wed, 23 Mar 2022 17:12:58 -0700 Subject: [PATCH 1/7] add enable/disable invoker support to old scheduler and add is enabled route for invoker --- .../core/invoker/DefaultInvokerServer.scala | 2 + .../core/invoker/FPCInvokerReactive.scala | 7 +++- .../core/invoker/FPCInvokerServer.scala | 2 + .../openwhisk/core/invoker/Invoker.scala | 6 +++ .../core/invoker/InvokerReactive.scala | 37 ++++++++++++++----- 5 files changed, 44 insertions(+), 10 deletions(-) diff --git a/core/invoker/src/main/scala/org/apache/openwhisk/core/invoker/DefaultInvokerServer.scala b/core/invoker/src/main/scala/org/apache/openwhisk/core/invoker/DefaultInvokerServer.scala index d372be79783..f2c4e56ba08 100644 --- a/core/invoker/src/main/scala/org/apache/openwhisk/core/invoker/DefaultInvokerServer.scala +++ b/core/invoker/src/main/scala/org/apache/openwhisk/core/invoker/DefaultInvokerServer.scala @@ -47,6 +47,8 @@ class DefaultInvokerServer(val invoker: InvokerCore, systemUsername: String, sys invoker.enable() } ~ (path("disable") & post) { invoker.disable() + } ~ (path("isEnabled") & get) { + invoker.isEnabled() } case _ => terminate(StatusCodes.Unauthorized) } diff --git a/core/invoker/src/main/scala/org/apache/openwhisk/core/invoker/FPCInvokerReactive.scala b/core/invoker/src/main/scala/org/apache/openwhisk/core/invoker/FPCInvokerReactive.scala index 8158fd3ab27..647811509f5 100644 --- a/core/invoker/src/main/scala/org/apache/openwhisk/core/invoker/FPCInvokerReactive.scala +++ b/core/invoker/src/main/scala/org/apache/openwhisk/core/invoker/FPCInvokerReactive.scala @@ -34,11 +34,12 @@ import org.apache.openwhisk.core.containerpool.logging.LogStoreProvider import org.apache.openwhisk.core.containerpool.v2._ import org.apache.openwhisk.core.database.{UserContext, _} import org.apache.openwhisk.core.entity._ -import org.apache.openwhisk.core.etcd.EtcdKV.ContainerKeys.{containerPrefix} +import org.apache.openwhisk.core.etcd.EtcdKV.ContainerKeys.containerPrefix import org.apache.openwhisk.core.etcd.EtcdKV.QueueKeys.queue import org.apache.openwhisk.core.etcd.EtcdKV.{ContainerKeys, SchedulerKeys} import org.apache.openwhisk.core.etcd.EtcdType._ import org.apache.openwhisk.core.etcd.{EtcdClient, EtcdConfig} +import org.apache.openwhisk.core.invoker.Invoker.InvokerEnabled import org.apache.openwhisk.core.scheduler.{SchedulerEndpoints, SchedulerStates} import org.apache.openwhisk.core.service.{DataManagementService, EtcdWorker, LeaseKeepAliveService, WatcherService} import org.apache.openwhisk.core.{ConfigKeys, WarmUp, WhiskConfig} @@ -402,6 +403,10 @@ class FPCInvokerReactive(config: WhiskConfig, complete("Successfully disabled invoker") } + override def isEnabled(): Route = { + complete(InvokerEnabled(consumer.nonEmpty && warmUpWatcher.nonEmpty).toJson().compactPrint) + } + override def backfillPrewarm(): Route = { pool ! AdjustPrewarmedContainer complete("backfilling prewarm container") diff --git a/core/invoker/src/main/scala/org/apache/openwhisk/core/invoker/FPCInvokerServer.scala b/core/invoker/src/main/scala/org/apache/openwhisk/core/invoker/FPCInvokerServer.scala index f1b8e8cd729..a3b800e1e77 100644 --- a/core/invoker/src/main/scala/org/apache/openwhisk/core/invoker/FPCInvokerServer.scala +++ b/core/invoker/src/main/scala/org/apache/openwhisk/core/invoker/FPCInvokerServer.scala @@ -47,6 +47,8 @@ class FPCInvokerServer(val invoker: InvokerCore, systemUsername: String, systemP invoker.enable() } ~ (path("disable") & post) { invoker.disable() + } ~ (path("isEnabled") & get) { + invoker.isEnabled() } case _ => terminate(StatusCodes.Unauthorized) } diff --git a/core/invoker/src/main/scala/org/apache/openwhisk/core/invoker/Invoker.scala b/core/invoker/src/main/scala/org/apache/openwhisk/core/invoker/Invoker.scala index 6de751f2ec9..59027133bca 100644 --- a/core/invoker/src/main/scala/org/apache/openwhisk/core/invoker/Invoker.scala +++ b/core/invoker/src/main/scala/org/apache/openwhisk/core/invoker/Invoker.scala @@ -35,6 +35,7 @@ import org.apache.openwhisk.spi.{Spi, SpiLoader} import org.apache.openwhisk.utils.ExecutionContextFactory import pureconfig._ import pureconfig.generic.auto._ +import spray.json.{JsBoolean, JsObject} import scala.concurrent.duration._ import scala.concurrent.{Await, ExecutionContext, Future} @@ -74,6 +75,10 @@ object Invoker { val topicPrefix = loadConfigOrThrow[String](ConfigKeys.kafkaTopicsPrefix) + case class InvokerEnabled(isEnabled: Boolean) { + def toJson(): JsObject = JsObject("enabled" -> JsBoolean(isEnabled)) + } + /** * An object which records the environment variables required for this component to run. */ @@ -220,6 +225,7 @@ trait InvokerProvider extends Spi { trait InvokerCore { def enable(): Route def disable(): Route + def isEnabled(): Route def backfillPrewarm(): Route } diff --git a/core/invoker/src/main/scala/org/apache/openwhisk/core/invoker/InvokerReactive.scala b/core/invoker/src/main/scala/org/apache/openwhisk/core/invoker/InvokerReactive.scala index e32ece2fce8..3e1f3508395 100644 --- a/core/invoker/src/main/scala/org/apache/openwhisk/core/invoker/InvokerReactive.scala +++ b/core/invoker/src/main/scala/org/apache/openwhisk/core/invoker/InvokerReactive.scala @@ -19,9 +19,8 @@ package org.apache.openwhisk.core.invoker import java.nio.charset.StandardCharsets import java.time.Instant - import akka.Done -import akka.actor.{ActorRefFactory, ActorSystem, CoordinatedShutdown, Props} +import akka.actor.{ActorRef, ActorRefFactory, ActorSystem, CoordinatedShutdown, Props} import akka.event.Logging.InfoLevel import akka.http.scaladsl.server.Directives.complete import akka.http.scaladsl.server.Route @@ -34,6 +33,7 @@ import org.apache.openwhisk.core.containerpool.logging.LogStoreProvider import org.apache.openwhisk.core.database.{UserContext, _} import org.apache.openwhisk.core.entity._ import org.apache.openwhisk.core.entity.size._ +import org.apache.openwhisk.core.invoker.Invoker.InvokerEnabled import org.apache.openwhisk.core.{ConfigKeys, WhiskConfig} import org.apache.openwhisk.http.Messages import org.apache.openwhisk.spi.SpiLoader @@ -46,7 +46,6 @@ import scala.concurrent.{ExecutionContext, Future} import scala.util.{Failure, Success} object InvokerReactive extends InvokerProvider { - override def instance( config: WhiskConfig, instance: InvokerInstanceId, @@ -293,18 +292,38 @@ class InvokerReactive( } private val healthProducer = msgProvider.getProducer(config) - Scheduler.scheduleWaitAtMost(1.seconds)(() => { - healthProducer.send(s"${Invoker.topicPrefix}health", PingMessage(instance)).andThen { - case Failure(t) => logging.error(this, s"failed to ping the controller: $t") - } - }) + + def getHealthScheduler: ActorRef = + Scheduler.scheduleWaitAtMost(1.seconds)(() => { + healthProducer.send(s"${Invoker.topicPrefix}health", PingMessage(instance)).andThen { + case Failure(t) => logging.error(this, s"failed to ping the controller: $t") + } + }) + + var healthScheduler: Option[ActorRef] = Some(getHealthScheduler) override def enable(): Route = { + if (healthScheduler.isEmpty) { + healthScheduler = Some(getHealthScheduler) + complete(s"${instance.toString} is now enabled.") + } else { + complete(s"${instance.toString} is already enabled.") + } complete("not supported") } override def disable(): Route = { - complete("not supported") + if (healthScheduler.nonEmpty) { + actorSystem.stop(healthScheduler.get) + healthScheduler = None + complete(s"${instance.toString} is now disabled.") + } else { + complete(s"${instance.toString} is already disabled.") + } + } + + override def isEnabled(): Route = { + complete(InvokerEnabled(healthScheduler.nonEmpty).toJson().compactPrint) } override def backfillPrewarm(): Route = { From 96639f8ffa2541c19eaad25f16400fd4829490ba Mon Sep 17 00:00:00 2001 From: Brendan Doyle Date: Wed, 23 Mar 2022 19:11:24 -0700 Subject: [PATCH 2/7] feedback --- .../org/apache/openwhisk/core/invoker/InvokerReactive.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/core/invoker/src/main/scala/org/apache/openwhisk/core/invoker/InvokerReactive.scala b/core/invoker/src/main/scala/org/apache/openwhisk/core/invoker/InvokerReactive.scala index 3e1f3508395..a9830267167 100644 --- a/core/invoker/src/main/scala/org/apache/openwhisk/core/invoker/InvokerReactive.scala +++ b/core/invoker/src/main/scala/org/apache/openwhisk/core/invoker/InvokerReactive.scala @@ -300,7 +300,7 @@ class InvokerReactive( } }) - var healthScheduler: Option[ActorRef] = Some(getHealthScheduler) + private var healthScheduler: Option[ActorRef] = Some(getHealthScheduler) override def enable(): Route = { if (healthScheduler.isEmpty) { From a53c12c69fa86f16ca96d4b466b62f280bf5aee5 Mon Sep 17 00:00:00 2001 From: Brendan Doyle Date: Thu, 24 Mar 2022 09:51:31 -0700 Subject: [PATCH 3/7] fix enable complete --- .../org/apache/openwhisk/core/invoker/InvokerReactive.scala | 1 - 1 file changed, 1 deletion(-) diff --git a/core/invoker/src/main/scala/org/apache/openwhisk/core/invoker/InvokerReactive.scala b/core/invoker/src/main/scala/org/apache/openwhisk/core/invoker/InvokerReactive.scala index a9830267167..230b84aa75c 100644 --- a/core/invoker/src/main/scala/org/apache/openwhisk/core/invoker/InvokerReactive.scala +++ b/core/invoker/src/main/scala/org/apache/openwhisk/core/invoker/InvokerReactive.scala @@ -309,7 +309,6 @@ class InvokerReactive( } else { complete(s"${instance.toString} is already enabled.") } - complete("not supported") } override def disable(): Route = { From 0af274498068a9a6af0d0b2fa2a4f920c585b3c9 Mon Sep 17 00:00:00 2001 From: Brendan Doyle Date: Thu, 24 Mar 2022 13:14:12 -0700 Subject: [PATCH 4/7] fail fast invoker to offline with updated ping message --- .../openwhisk/core/connector/Message.scala | 6 ++++-- .../loadBalancer/InvokerSupervision.scala | 20 ++++++++++++++----- .../core/invoker/InvokerReactive.scala | 15 ++++++++------ 3 files changed, 28 insertions(+), 13 deletions(-) diff --git a/common/scala/src/main/scala/org/apache/openwhisk/core/connector/Message.scala b/common/scala/src/main/scala/org/apache/openwhisk/core/connector/Message.scala index 9123747d908..f9fea82e1fe 100644 --- a/common/scala/src/main/scala/org/apache/openwhisk/core/connector/Message.scala +++ b/common/scala/src/main/scala/org/apache/openwhisk/core/connector/Message.scala @@ -278,14 +278,16 @@ object AcknowledegmentMessage extends DefaultJsonProtocol { } } -case class PingMessage(instance: InvokerInstanceId) extends Message { +case class PingMessage(instance: InvokerInstanceId, isEnabled: Option[Boolean] = None) extends Message { override def serialize = PingMessage.serdes.write(this).compactPrint + + def invokerEnabled: Boolean = isEnabled.getOrElse(true) } object PingMessage extends DefaultJsonProtocol { def parse(msg: String) = Try(serdes.read(msg.parseJson)) - implicit val serdes = jsonFormat(PingMessage.apply _, "name") + implicit val serdes = jsonFormat(PingMessage.apply _, "name", "isEnabled") } trait EventMessageBody extends Message { diff --git a/core/controller/src/main/scala/org/apache/openwhisk/core/loadBalancer/InvokerSupervision.scala b/core/controller/src/main/scala/org/apache/openwhisk/core/loadBalancer/InvokerSupervision.scala index 86ece17aff3..f526da0e738 100644 --- a/core/controller/src/main/scala/org/apache/openwhisk/core/loadBalancer/InvokerSupervision.scala +++ b/core/controller/src/main/scala/org/apache/openwhisk/core/loadBalancer/InvokerSupervision.scala @@ -280,8 +280,8 @@ class InvokerActor(invokerInstance: InvokerInstanceId, controllerInstance: Contr // To be used for all states that should send test actions to reverify the invoker val healthPingingState: StateFunction = { - case Event(_: PingMessage, _) => stay - case Event(StateTimeout, _) => goto(Offline) + case Event(ping: PingMessage, _) => goOfflineIfDisabled(ping) + case Event(StateTimeout, _) => goto(Offline) case Event(Tick, _) => invokeTestAction() stay @@ -300,7 +300,7 @@ class InvokerActor(invokerInstance: InvokerInstanceId, controllerInstance: Contr /** An Offline invoker represents an existing but broken invoker. This means, that it does not send pings anymore. */ when(Offline) { - case Event(_: PingMessage, _) => goto(Unhealthy) + case Event(ping: PingMessage, _) => if (ping.invokerEnabled) goto(Unhealthy) else stay } /** An Unhealthy invoker represents an invoker that was not able to handle actions successfully. */ @@ -314,8 +314,8 @@ class InvokerActor(invokerInstance: InvokerInstanceId, controllerInstance: Contr * It will go offline if that state is not confirmed for 20 seconds. */ when(Healthy, stateTimeout = healthyTimeout) { - case Event(_: PingMessage, _) => stay - case Event(StateTimeout, _) => goto(Offline) + case Event(ping: PingMessage, _) => goOfflineIfDisabled(ping) + case Event(StateTimeout, _) => goto(Offline) } /** Handles the completion of an Activation in every state. */ @@ -339,6 +339,16 @@ class InvokerActor(invokerInstance: InvokerInstanceId, controllerInstance: Contr initialize() + /** + * Handling for if a ping message from an invoker signals that it has been disabled to immediately transition to Offline. + * + * @param ping + * @return + */ + private def goOfflineIfDisabled(ping: PingMessage) = { + if (ping.invokerEnabled) stay else goto(Offline) + } + /** * Handling for active acks. This method saves the result (successful or unsuccessful) * into an RingBuffer and checks, if the InvokerActor has to be changed to UnHealthy. diff --git a/core/invoker/src/main/scala/org/apache/openwhisk/core/invoker/InvokerReactive.scala b/core/invoker/src/main/scala/org/apache/openwhisk/core/invoker/InvokerReactive.scala index 230b84aa75c..7dc777b6c4d 100644 --- a/core/invoker/src/main/scala/org/apache/openwhisk/core/invoker/InvokerReactive.scala +++ b/core/invoker/src/main/scala/org/apache/openwhisk/core/invoker/InvokerReactive.scala @@ -293,12 +293,14 @@ class InvokerReactive( private val healthProducer = msgProvider.getProducer(config) - def getHealthScheduler: ActorRef = - Scheduler.scheduleWaitAtMost(1.seconds)(() => { - healthProducer.send(s"${Invoker.topicPrefix}health", PingMessage(instance)).andThen { - case Failure(t) => logging.error(this, s"failed to ping the controller: $t") - } - }) + private def getHealthScheduler: ActorRef = + Scheduler.scheduleWaitAtMost(1.seconds)(() => pingController(isEnabled = true)) + + private def pingController(isEnabled: Boolean) = { + healthProducer.send(s"${Invoker.topicPrefix}health", PingMessage(instance, isEnabled = Some(isEnabled))).andThen { + case Failure(t) => logging.error(this, s"failed to ping the controller: $t") + } + } private var healthScheduler: Option[ActorRef] = Some(getHealthScheduler) @@ -312,6 +314,7 @@ class InvokerReactive( } override def disable(): Route = { + pingController(isEnabled = false) if (healthScheduler.nonEmpty) { actorSystem.stop(healthScheduler.get) healthScheduler = None From 15517c3313a815205d5f579a665bb21d1adc94b5 Mon Sep 17 00:00:00 2001 From: Brendan Doyle Date: Fri, 25 Mar 2022 11:21:30 -0700 Subject: [PATCH 5/7] test compilation --- .../core/invoker/test/DefaultInvokerServerTests.scala | 4 ++++ .../openwhisk/core/invoker/test/FPCInvokerServerTests.scala | 4 ++++ 2 files changed, 8 insertions(+) diff --git a/tests/src/test/scala/org/apache/openwhisk/core/invoker/test/DefaultInvokerServerTests.scala b/tests/src/test/scala/org/apache/openwhisk/core/invoker/test/DefaultInvokerServerTests.scala index 109512892f4..c75151ad67f 100644 --- a/tests/src/test/scala/org/apache/openwhisk/core/invoker/test/DefaultInvokerServerTests.scala +++ b/tests/src/test/scala/org/apache/openwhisk/core/invoker/test/DefaultInvokerServerTests.scala @@ -130,6 +130,10 @@ class TestInvokerReactive extends InvokerCore with BasicHttpService { complete("") } + override def isEnabled(): Route = { + complete("") + } + override def backfillPrewarm(): Route = { complete("") } diff --git a/tests/src/test/scala/org/apache/openwhisk/core/invoker/test/FPCInvokerServerTests.scala b/tests/src/test/scala/org/apache/openwhisk/core/invoker/test/FPCInvokerServerTests.scala index e387cd637c2..b723af992cd 100644 --- a/tests/src/test/scala/org/apache/openwhisk/core/invoker/test/FPCInvokerServerTests.scala +++ b/tests/src/test/scala/org/apache/openwhisk/core/invoker/test/FPCInvokerServerTests.scala @@ -129,6 +129,10 @@ class TestFPCInvokerReactive extends InvokerCore with BasicHttpService { complete("") } + override def isEnabled(): Route = { + complete("") + } + override def backfillPrewarm(): Route = { complete("") } From 6e7d5ab59ccbcdb8c5b774013dfe866ebacb2105 Mon Sep 17 00:00:00 2001 From: Brendan Doyle Date: Fri, 25 Mar 2022 17:18:34 -0700 Subject: [PATCH 6/7] add tests --- .../openwhisk/core/connector/Message.scala | 2 +- .../core/invoker/FPCInvokerReactive.scala | 2 +- .../openwhisk/core/invoker/Invoker.scala | 9 +++-- .../core/invoker/InvokerReactive.scala | 2 +- .../test/DefaultInvokerServerTests.scala | 12 ++++++- .../invoker/test/FPCInvokerServerTests.scala | 12 ++++++- .../test/InvokerSupervisionTests.scala | 34 +++++++++++++++++++ 7 files changed, 66 insertions(+), 7 deletions(-) diff --git a/common/scala/src/main/scala/org/apache/openwhisk/core/connector/Message.scala b/common/scala/src/main/scala/org/apache/openwhisk/core/connector/Message.scala index f9fea82e1fe..e823cf109cf 100644 --- a/common/scala/src/main/scala/org/apache/openwhisk/core/connector/Message.scala +++ b/common/scala/src/main/scala/org/apache/openwhisk/core/connector/Message.scala @@ -287,7 +287,7 @@ case class PingMessage(instance: InvokerInstanceId, isEnabled: Option[Boolean] = object PingMessage extends DefaultJsonProtocol { def parse(msg: String) = Try(serdes.read(msg.parseJson)) - implicit val serdes = jsonFormat(PingMessage.apply _, "name", "isEnabled") + implicit val serdes = jsonFormat(PingMessage.apply, "name", "isEnabled") } trait EventMessageBody extends Message { diff --git a/core/invoker/src/main/scala/org/apache/openwhisk/core/invoker/FPCInvokerReactive.scala b/core/invoker/src/main/scala/org/apache/openwhisk/core/invoker/FPCInvokerReactive.scala index 647811509f5..9087bbd58a0 100644 --- a/core/invoker/src/main/scala/org/apache/openwhisk/core/invoker/FPCInvokerReactive.scala +++ b/core/invoker/src/main/scala/org/apache/openwhisk/core/invoker/FPCInvokerReactive.scala @@ -404,7 +404,7 @@ class FPCInvokerReactive(config: WhiskConfig, } override def isEnabled(): Route = { - complete(InvokerEnabled(consumer.nonEmpty && warmUpWatcher.nonEmpty).toJson().compactPrint) + complete(InvokerEnabled(consumer.nonEmpty && warmUpWatcher.nonEmpty).serialize()) } override def backfillPrewarm(): Route = { diff --git a/core/invoker/src/main/scala/org/apache/openwhisk/core/invoker/Invoker.scala b/core/invoker/src/main/scala/org/apache/openwhisk/core/invoker/Invoker.scala index 59027133bca..89656c98148 100644 --- a/core/invoker/src/main/scala/org/apache/openwhisk/core/invoker/Invoker.scala +++ b/core/invoker/src/main/scala/org/apache/openwhisk/core/invoker/Invoker.scala @@ -35,7 +35,7 @@ import org.apache.openwhisk.spi.{Spi, SpiLoader} import org.apache.openwhisk.utils.ExecutionContextFactory import pureconfig._ import pureconfig.generic.auto._ -import spray.json.{JsBoolean, JsObject} +import spray.json._ import scala.concurrent.duration._ import scala.concurrent.{Await, ExecutionContext, Future} @@ -75,8 +75,13 @@ object Invoker { val topicPrefix = loadConfigOrThrow[String](ConfigKeys.kafkaTopicsPrefix) + object InvokerEnabled extends DefaultJsonProtocol { + def parseJson(string: String) = Try(serdes.read(string.parseJson)) + implicit val serdes = jsonFormat(InvokerEnabled.apply _, "enabled") + } + case class InvokerEnabled(isEnabled: Boolean) { - def toJson(): JsObject = JsObject("enabled" -> JsBoolean(isEnabled)) + def serialize(): String = InvokerEnabled.serdes.write(this).compactPrint } /** diff --git a/core/invoker/src/main/scala/org/apache/openwhisk/core/invoker/InvokerReactive.scala b/core/invoker/src/main/scala/org/apache/openwhisk/core/invoker/InvokerReactive.scala index 7dc777b6c4d..31d1f22930c 100644 --- a/core/invoker/src/main/scala/org/apache/openwhisk/core/invoker/InvokerReactive.scala +++ b/core/invoker/src/main/scala/org/apache/openwhisk/core/invoker/InvokerReactive.scala @@ -325,7 +325,7 @@ class InvokerReactive( } override def isEnabled(): Route = { - complete(InvokerEnabled(healthScheduler.nonEmpty).toJson().compactPrint) + complete(InvokerEnabled(healthScheduler.nonEmpty).serialize()) } override def backfillPrewarm(): Route = { diff --git a/tests/src/test/scala/org/apache/openwhisk/core/invoker/test/DefaultInvokerServerTests.scala b/tests/src/test/scala/org/apache/openwhisk/core/invoker/test/DefaultInvokerServerTests.scala index c75151ad67f..7d81035221e 100644 --- a/tests/src/test/scala/org/apache/openwhisk/core/invoker/test/DefaultInvokerServerTests.scala +++ b/tests/src/test/scala/org/apache/openwhisk/core/invoker/test/DefaultInvokerServerTests.scala @@ -23,6 +23,7 @@ import akka.http.scaladsl.server.Route import akka.http.scaladsl.testkit.ScalatestRouteTest import common.StreamLogging import org.apache.openwhisk.common.TransactionId +import org.apache.openwhisk.core.invoker.Invoker.InvokerEnabled import org.apache.openwhisk.core.invoker.{DefaultInvokerServer, InvokerCore} import org.apache.openwhisk.http.BasicHttpService import org.junit.runner.RunWith @@ -76,6 +77,15 @@ class DefaultInvokerServerTests } } + it should "check if invoker is enabled" in { + implicit val tid = transid() + val validCredentials = BasicHttpCredentials(systemUsername, systemPassword) + Get(s"/isEnabled") ~> addCredentials(validCredentials) ~> Route.seal(server.routes(tid)) ~> check { + status should be(OK) + InvokerEnabled.parseJson(responseEntity.toString) shouldEqual InvokerEnabled(true) + } + } + it should "not enable invoker with invalid credential" in { implicit val tid = transid() val invalidCredentials = BasicHttpCredentials("invaliduser", "invalidpass") @@ -131,7 +141,7 @@ class TestInvokerReactive extends InvokerCore with BasicHttpService { } override def isEnabled(): Route = { - complete("") + complete(InvokerEnabled(true).serialize()) } override def backfillPrewarm(): Route = { diff --git a/tests/src/test/scala/org/apache/openwhisk/core/invoker/test/FPCInvokerServerTests.scala b/tests/src/test/scala/org/apache/openwhisk/core/invoker/test/FPCInvokerServerTests.scala index b723af992cd..39eabd68b65 100644 --- a/tests/src/test/scala/org/apache/openwhisk/core/invoker/test/FPCInvokerServerTests.scala +++ b/tests/src/test/scala/org/apache/openwhisk/core/invoker/test/FPCInvokerServerTests.scala @@ -23,6 +23,7 @@ import akka.http.scaladsl.server.Route import akka.http.scaladsl.testkit.ScalatestRouteTest import common.StreamLogging import org.apache.openwhisk.common.TransactionId +import org.apache.openwhisk.core.invoker.Invoker.InvokerEnabled import org.apache.openwhisk.core.invoker.{FPCInvokerServer, InvokerCore} import org.apache.openwhisk.http.BasicHttpService import org.junit.runner.RunWith @@ -76,6 +77,15 @@ class FPCInvokerServerTests } } + it should "check if invoker is enabled" in { + implicit val tid = transid() + val validCredentials = BasicHttpCredentials(systemUsername, systemPassword) + Get(s"/isEnabled") ~> addCredentials(validCredentials) ~> Route.seal(server.routes(tid)) ~> check { + status should be(OK) + InvokerEnabled.parseJson(responseEntity.toString) shouldEqual InvokerEnabled(true) + } + } + it should "not enable invoker with invalid credential" in { implicit val tid = transid() val invalidCredentials = BasicHttpCredentials("invaliduser", "invalidpass") @@ -130,7 +140,7 @@ class TestFPCInvokerReactive extends InvokerCore with BasicHttpService { } override def isEnabled(): Route = { - complete("") + complete(InvokerEnabled(true).serialize()) } override def backfillPrewarm(): Route = { diff --git a/tests/src/test/scala/org/apache/openwhisk/core/loadBalancer/test/InvokerSupervisionTests.scala b/tests/src/test/scala/org/apache/openwhisk/core/loadBalancer/test/InvokerSupervisionTests.scala index a4a6145e5f5..463005d6c2b 100644 --- a/tests/src/test/scala/org/apache/openwhisk/core/loadBalancer/test/InvokerSupervisionTests.scala +++ b/tests/src/test/scala/org/apache/openwhisk/core/loadBalancer/test/InvokerSupervisionTests.scala @@ -364,6 +364,18 @@ class InvokerSupervisionTests } } + // unhealthy -> offline + // offline -> off + it should "go offline when unhealthy and disabled invoker ping received and stay offline if disabled ping received while offline" in { + val invoker = + TestFSMRef(new InvokerActor(InvokerInstanceId(0, userMemory = defaultUserMemory), ControllerInstanceId("0"))) + invoker.stateName shouldBe Unhealthy + invoker ! PingMessage(InvokerInstanceId(0, userMemory = defaultUserMemory), Some(false)) + invoker.stateName shouldBe Offline + invoker ! PingMessage(InvokerInstanceId(0, userMemory = defaultUserMemory), Some(false)) + invoker.stateName shouldBe Offline + } + it should "start timer to send test actions when unhealthy" in { val invoker = TestFSMRef(new InvokerActor(InvokerInstanceId(0, userMemory = defaultUserMemory), ControllerInstanceId("0"))) @@ -382,6 +394,28 @@ class InvokerSupervisionTests invoker.isTimerActive(InvokerActor.timerName) shouldBe false } + // healthy -> offline + it should "go offline from healthy immediately when disabled invoker ping received" in { + val invoker = + TestFSMRef(new InvokerActor(InvokerInstanceId(0, userMemory = defaultUserMemory), ControllerInstanceId("0"))) + invoker.stateName shouldBe Unhealthy + + invoker.isTimerActive(InvokerActor.timerName) shouldBe true + + // Fill buffer with successful invocations to become healthy again (one below errorTolerance) + (1 to InvokerActor.bufferSize - InvokerActor.bufferErrorTolerance).foreach { _ => + invoker ! InvocationFinishedMessage( + InvokerInstanceId(0, userMemory = defaultUserMemory), + InvocationFinishedResult.Success) + } + invoker.stateName shouldBe Healthy + + invoker.isTimerActive(InvokerActor.timerName) shouldBe false + + invoker ! PingMessage(InvokerInstanceId(0, userMemory = defaultUserMemory), Some(false)) + invoker.stateName shouldBe Offline + } + it should "initially store invoker status with its full id - instance/uniqueName/displayedName" in { val invoker0 = TestProbe() val children = mutable.Queue(invoker0.ref) From 94d37af586bac42fc35e910aeaaec804459bb3b7 Mon Sep 17 00:00:00 2001 From: Brendan Doyle Date: Sat, 26 Mar 2022 13:41:16 -0700 Subject: [PATCH 7/7] fix tests --- .../core/invoker/test/DefaultInvokerServerTests.scala | 5 ++++- .../openwhisk/core/invoker/test/FPCInvokerServerTests.scala | 5 ++++- 2 files changed, 8 insertions(+), 2 deletions(-) diff --git a/tests/src/test/scala/org/apache/openwhisk/core/invoker/test/DefaultInvokerServerTests.scala b/tests/src/test/scala/org/apache/openwhisk/core/invoker/test/DefaultInvokerServerTests.scala index 7d81035221e..57cb976d242 100644 --- a/tests/src/test/scala/org/apache/openwhisk/core/invoker/test/DefaultInvokerServerTests.scala +++ b/tests/src/test/scala/org/apache/openwhisk/core/invoker/test/DefaultInvokerServerTests.scala @@ -21,6 +21,7 @@ import akka.http.scaladsl.model.StatusCodes.{OK, Unauthorized} import akka.http.scaladsl.model.headers.BasicHttpCredentials import akka.http.scaladsl.server.Route import akka.http.scaladsl.testkit.ScalatestRouteTest +import akka.http.scaladsl.unmarshalling.Unmarshal import common.StreamLogging import org.apache.openwhisk.common.TransactionId import org.apache.openwhisk.core.invoker.Invoker.InvokerEnabled @@ -82,7 +83,9 @@ class DefaultInvokerServerTests val validCredentials = BasicHttpCredentials(systemUsername, systemPassword) Get(s"/isEnabled") ~> addCredentials(validCredentials) ~> Route.seal(server.routes(tid)) ~> check { status should be(OK) - InvokerEnabled.parseJson(responseEntity.toString) shouldEqual InvokerEnabled(true) + Unmarshal(responseEntity).to[String].map(response => { + InvokerEnabled.parseJson(response) shouldEqual InvokerEnabled(true) + }) } } diff --git a/tests/src/test/scala/org/apache/openwhisk/core/invoker/test/FPCInvokerServerTests.scala b/tests/src/test/scala/org/apache/openwhisk/core/invoker/test/FPCInvokerServerTests.scala index 39eabd68b65..e7ab02e2bb8 100644 --- a/tests/src/test/scala/org/apache/openwhisk/core/invoker/test/FPCInvokerServerTests.scala +++ b/tests/src/test/scala/org/apache/openwhisk/core/invoker/test/FPCInvokerServerTests.scala @@ -21,6 +21,7 @@ import akka.http.scaladsl.model.StatusCodes.{OK, Unauthorized} import akka.http.scaladsl.model.headers.BasicHttpCredentials import akka.http.scaladsl.server.Route import akka.http.scaladsl.testkit.ScalatestRouteTest +import akka.http.scaladsl.unmarshalling.Unmarshal import common.StreamLogging import org.apache.openwhisk.common.TransactionId import org.apache.openwhisk.core.invoker.Invoker.InvokerEnabled @@ -82,7 +83,9 @@ class FPCInvokerServerTests val validCredentials = BasicHttpCredentials(systemUsername, systemPassword) Get(s"/isEnabled") ~> addCredentials(validCredentials) ~> Route.seal(server.routes(tid)) ~> check { status should be(OK) - InvokerEnabled.parseJson(responseEntity.toString) shouldEqual InvokerEnabled(true) + Unmarshal(responseEntity).to[String].map(response => { + InvokerEnabled.parseJson(response) shouldEqual InvokerEnabled(true) + }) } }