Skip to content

add enable/disable invoker support to old scheduler #5205

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -278,14 +278,16 @@ object AcknowledegmentMessage extends DefaultJsonProtocol {
}
}

case class PingMessage(instance: InvokerInstanceId) extends Message {
case class PingMessage(instance: InvokerInstanceId, isEnabled: Option[Boolean] = None) extends Message {
override def serialize = PingMessage.serdes.write(this).compactPrint

def invokerEnabled: Boolean = isEnabled.getOrElse(true)
}

object PingMessage extends DefaultJsonProtocol {
def parse(msg: String) = Try(serdes.read(msg.parseJson))

implicit val serdes = jsonFormat(PingMessage.apply _, "name")
implicit val serdes = jsonFormat(PingMessage.apply, "name", "isEnabled")
}

trait EventMessageBody extends Message {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -280,8 +280,8 @@ class InvokerActor(invokerInstance: InvokerInstanceId, controllerInstance: Contr

// To be used for all states that should send test actions to reverify the invoker
val healthPingingState: StateFunction = {
case Event(_: PingMessage, _) => stay
case Event(StateTimeout, _) => goto(Offline)
case Event(ping: PingMessage, _) => goOfflineIfDisabled(ping)
case Event(StateTimeout, _) => goto(Offline)
case Event(Tick, _) =>
invokeTestAction()
stay
Expand All @@ -300,7 +300,7 @@ class InvokerActor(invokerInstance: InvokerInstanceId, controllerInstance: Contr

/** An Offline invoker represents an existing but broken invoker. This means, that it does not send pings anymore. */
when(Offline) {
case Event(_: PingMessage, _) => goto(Unhealthy)
case Event(ping: PingMessage, _) => if (ping.invokerEnabled) goto(Unhealthy) else stay
}

/** An Unhealthy invoker represents an invoker that was not able to handle actions successfully. */
Expand All @@ -314,8 +314,8 @@ class InvokerActor(invokerInstance: InvokerInstanceId, controllerInstance: Contr
* It will go offline if that state is not confirmed for 20 seconds.
*/
when(Healthy, stateTimeout = healthyTimeout) {
case Event(_: PingMessage, _) => stay
case Event(StateTimeout, _) => goto(Offline)
case Event(ping: PingMessage, _) => goOfflineIfDisabled(ping)
case Event(StateTimeout, _) => goto(Offline)
}

/** Handles the completion of an Activation in every state. */
Expand All @@ -339,6 +339,16 @@ class InvokerActor(invokerInstance: InvokerInstanceId, controllerInstance: Contr

initialize()

/**
* Handling for if a ping message from an invoker signals that it has been disabled to immediately transition to Offline.
*
* @param ping
* @return
*/
private def goOfflineIfDisabled(ping: PingMessage) = {
if (ping.invokerEnabled) stay else goto(Offline)
}

/**
* Handling for active acks. This method saves the result (successful or unsuccessful)
* into an RingBuffer and checks, if the InvokerActor has to be changed to UnHealthy.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,8 @@ class DefaultInvokerServer(val invoker: InvokerCore, systemUsername: String, sys
invoker.enable()
} ~ (path("disable") & post) {
invoker.disable()
} ~ (path("isEnabled") & get) {
invoker.isEnabled()
}
case _ => terminate(StatusCodes.Unauthorized)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,11 +34,12 @@ import org.apache.openwhisk.core.containerpool.logging.LogStoreProvider
import org.apache.openwhisk.core.containerpool.v2._
import org.apache.openwhisk.core.database.{UserContext, _}
import org.apache.openwhisk.core.entity._
import org.apache.openwhisk.core.etcd.EtcdKV.ContainerKeys.{containerPrefix}
import org.apache.openwhisk.core.etcd.EtcdKV.ContainerKeys.containerPrefix
import org.apache.openwhisk.core.etcd.EtcdKV.QueueKeys.queue
import org.apache.openwhisk.core.etcd.EtcdKV.{ContainerKeys, SchedulerKeys}
import org.apache.openwhisk.core.etcd.EtcdType._
import org.apache.openwhisk.core.etcd.{EtcdClient, EtcdConfig}
import org.apache.openwhisk.core.invoker.Invoker.InvokerEnabled
import org.apache.openwhisk.core.scheduler.{SchedulerEndpoints, SchedulerStates}
import org.apache.openwhisk.core.service.{DataManagementService, EtcdWorker, LeaseKeepAliveService, WatcherService}
import org.apache.openwhisk.core.{ConfigKeys, WarmUp, WhiskConfig}
Expand Down Expand Up @@ -402,6 +403,10 @@ class FPCInvokerReactive(config: WhiskConfig,
complete("Successfully disabled invoker")
}

override def isEnabled(): Route = {
complete(InvokerEnabled(consumer.nonEmpty && warmUpWatcher.nonEmpty).serialize())
}

override def backfillPrewarm(): Route = {
pool ! AdjustPrewarmedContainer
complete("backfilling prewarm container")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,8 @@ class FPCInvokerServer(val invoker: InvokerCore, systemUsername: String, systemP
invoker.enable()
} ~ (path("disable") & post) {
invoker.disable()
} ~ (path("isEnabled") & get) {
invoker.isEnabled()
}
case _ => terminate(StatusCodes.Unauthorized)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ import org.apache.openwhisk.spi.{Spi, SpiLoader}
import org.apache.openwhisk.utils.ExecutionContextFactory
import pureconfig._
import pureconfig.generic.auto._
import spray.json._

import scala.concurrent.duration._
import scala.concurrent.{Await, ExecutionContext, Future}
Expand Down Expand Up @@ -74,6 +75,15 @@ object Invoker {

val topicPrefix = loadConfigOrThrow[String](ConfigKeys.kafkaTopicsPrefix)

object InvokerEnabled extends DefaultJsonProtocol {
def parseJson(string: String) = Try(serdes.read(string.parseJson))
implicit val serdes = jsonFormat(InvokerEnabled.apply _, "enabled")
}

case class InvokerEnabled(isEnabled: Boolean) {
def serialize(): String = InvokerEnabled.serdes.write(this).compactPrint
}

/**
* An object which records the environment variables required for this component to run.
*/
Expand Down Expand Up @@ -220,6 +230,7 @@ trait InvokerProvider extends Spi {
trait InvokerCore {
def enable(): Route
def disable(): Route
def isEnabled(): Route
def backfillPrewarm(): Route
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,8 @@ package org.apache.openwhisk.core.invoker

import java.nio.charset.StandardCharsets
import java.time.Instant

import akka.Done
import akka.actor.{ActorRefFactory, ActorSystem, CoordinatedShutdown, Props}
import akka.actor.{ActorRef, ActorRefFactory, ActorSystem, CoordinatedShutdown, Props}
import akka.event.Logging.InfoLevel
import akka.http.scaladsl.server.Directives.complete
import akka.http.scaladsl.server.Route
Expand All @@ -34,6 +33,7 @@ import org.apache.openwhisk.core.containerpool.logging.LogStoreProvider
import org.apache.openwhisk.core.database.{UserContext, _}
import org.apache.openwhisk.core.entity._
import org.apache.openwhisk.core.entity.size._
import org.apache.openwhisk.core.invoker.Invoker.InvokerEnabled
import org.apache.openwhisk.core.{ConfigKeys, WhiskConfig}
import org.apache.openwhisk.http.Messages
import org.apache.openwhisk.spi.SpiLoader
Expand All @@ -46,7 +46,6 @@ import scala.concurrent.{ExecutionContext, Future}
import scala.util.{Failure, Success}

object InvokerReactive extends InvokerProvider {

override def instance(
config: WhiskConfig,
instance: InvokerInstanceId,
Expand Down Expand Up @@ -293,18 +292,40 @@ class InvokerReactive(
}

private val healthProducer = msgProvider.getProducer(config)
Scheduler.scheduleWaitAtMost(1.seconds)(() => {
healthProducer.send(s"${Invoker.topicPrefix}health", PingMessage(instance)).andThen {

private def getHealthScheduler: ActorRef =
Scheduler.scheduleWaitAtMost(1.seconds)(() => pingController(isEnabled = true))

private def pingController(isEnabled: Boolean) = {
healthProducer.send(s"${Invoker.topicPrefix}health", PingMessage(instance, isEnabled = Some(isEnabled))).andThen {
case Failure(t) => logging.error(this, s"failed to ping the controller: $t")
}
})
}

private var healthScheduler: Option[ActorRef] = Some(getHealthScheduler)

override def enable(): Route = {
complete("not supported")
if (healthScheduler.isEmpty) {
healthScheduler = Some(getHealthScheduler)
complete(s"${instance.toString} is now enabled.")
} else {
complete(s"${instance.toString} is already enabled.")
}
}

override def disable(): Route = {
complete("not supported")
pingController(isEnabled = false)
if (healthScheduler.nonEmpty) {
actorSystem.stop(healthScheduler.get)
healthScheduler = None
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

JFYI.
While it would be effective, invokers in my downstream are sending a certain message to controllers so that they can make invokers unhealthy immediately to avoid a long-running activation coming to a disabled invoker during the timeout.

But I think this is just a minor tweak.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is that on the old scheduler for your downstream or a part of the new scheduler?

If on the old scheduler, I could do that as a follow up since it would require a change to the message bus and logic in the controller to read different health pings off the health topic. But think it's more of an optimization since the controller should recognize it as offline within ten seconds of disabling.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since the new scheduler relies on ETCD for health, the logic resides in the old scheduler.

I looked into the downstream code and realize that we just did a workaround to minimize the change by using an instance ID that is less than 0.
I think this would not be an official way but just a workaround.

complete(s"${instance.toString} is now disabled.")
} else {
complete(s"${instance.toString} is already disabled.")
}
}

override def isEnabled(): Route = {
complete(InvokerEnabled(healthScheduler.nonEmpty).serialize())
}

override def backfillPrewarm(): Route = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,10 @@ import akka.http.scaladsl.model.StatusCodes.{OK, Unauthorized}
import akka.http.scaladsl.model.headers.BasicHttpCredentials
import akka.http.scaladsl.server.Route
import akka.http.scaladsl.testkit.ScalatestRouteTest
import akka.http.scaladsl.unmarshalling.Unmarshal
import common.StreamLogging
import org.apache.openwhisk.common.TransactionId
import org.apache.openwhisk.core.invoker.Invoker.InvokerEnabled
import org.apache.openwhisk.core.invoker.{DefaultInvokerServer, InvokerCore}
import org.apache.openwhisk.http.BasicHttpService
import org.junit.runner.RunWith
Expand Down Expand Up @@ -76,6 +78,17 @@ class DefaultInvokerServerTests
}
}

it should "check if invoker is enabled" in {
implicit val tid = transid()
val validCredentials = BasicHttpCredentials(systemUsername, systemPassword)
Get(s"/isEnabled") ~> addCredentials(validCredentials) ~> Route.seal(server.routes(tid)) ~> check {
status should be(OK)
Unmarshal(responseEntity).to[String].map(response => {
InvokerEnabled.parseJson(response) shouldEqual InvokerEnabled(true)
})
}
}

it should "not enable invoker with invalid credential" in {
implicit val tid = transid()
val invalidCredentials = BasicHttpCredentials("invaliduser", "invalidpass")
Expand Down Expand Up @@ -130,6 +143,10 @@ class TestInvokerReactive extends InvokerCore with BasicHttpService {
complete("")
}

override def isEnabled(): Route = {
complete(InvokerEnabled(true).serialize())
}

override def backfillPrewarm(): Route = {
complete("")
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,10 @@ import akka.http.scaladsl.model.StatusCodes.{OK, Unauthorized}
import akka.http.scaladsl.model.headers.BasicHttpCredentials
import akka.http.scaladsl.server.Route
import akka.http.scaladsl.testkit.ScalatestRouteTest
import akka.http.scaladsl.unmarshalling.Unmarshal
import common.StreamLogging
import org.apache.openwhisk.common.TransactionId
import org.apache.openwhisk.core.invoker.Invoker.InvokerEnabled
import org.apache.openwhisk.core.invoker.{FPCInvokerServer, InvokerCore}
import org.apache.openwhisk.http.BasicHttpService
import org.junit.runner.RunWith
Expand Down Expand Up @@ -76,6 +78,17 @@ class FPCInvokerServerTests
}
}

it should "check if invoker is enabled" in {
implicit val tid = transid()
val validCredentials = BasicHttpCredentials(systemUsername, systemPassword)
Get(s"/isEnabled") ~> addCredentials(validCredentials) ~> Route.seal(server.routes(tid)) ~> check {
status should be(OK)
Unmarshal(responseEntity).to[String].map(response => {
InvokerEnabled.parseJson(response) shouldEqual InvokerEnabled(true)
})
}
}

it should "not enable invoker with invalid credential" in {
implicit val tid = transid()
val invalidCredentials = BasicHttpCredentials("invaliduser", "invalidpass")
Expand Down Expand Up @@ -129,6 +142,10 @@ class TestFPCInvokerReactive extends InvokerCore with BasicHttpService {
complete("")
}

override def isEnabled(): Route = {
complete(InvokerEnabled(true).serialize())
}

override def backfillPrewarm(): Route = {
complete("")
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -364,6 +364,18 @@ class InvokerSupervisionTests
}
}

// unhealthy -> offline
// offline -> off
it should "go offline when unhealthy and disabled invoker ping received and stay offline if disabled ping received while offline" in {
val invoker =
TestFSMRef(new InvokerActor(InvokerInstanceId(0, userMemory = defaultUserMemory), ControllerInstanceId("0")))
invoker.stateName shouldBe Unhealthy
invoker ! PingMessage(InvokerInstanceId(0, userMemory = defaultUserMemory), Some(false))
invoker.stateName shouldBe Offline
invoker ! PingMessage(InvokerInstanceId(0, userMemory = defaultUserMemory), Some(false))
invoker.stateName shouldBe Offline
}

it should "start timer to send test actions when unhealthy" in {
val invoker =
TestFSMRef(new InvokerActor(InvokerInstanceId(0, userMemory = defaultUserMemory), ControllerInstanceId("0")))
Expand All @@ -382,6 +394,28 @@ class InvokerSupervisionTests
invoker.isTimerActive(InvokerActor.timerName) shouldBe false
}

// healthy -> offline
it should "go offline from healthy immediately when disabled invoker ping received" in {
val invoker =
TestFSMRef(new InvokerActor(InvokerInstanceId(0, userMemory = defaultUserMemory), ControllerInstanceId("0")))
invoker.stateName shouldBe Unhealthy

invoker.isTimerActive(InvokerActor.timerName) shouldBe true

// Fill buffer with successful invocations to become healthy again (one below errorTolerance)
(1 to InvokerActor.bufferSize - InvokerActor.bufferErrorTolerance).foreach { _ =>
invoker ! InvocationFinishedMessage(
InvokerInstanceId(0, userMemory = defaultUserMemory),
InvocationFinishedResult.Success)
}
invoker.stateName shouldBe Healthy

invoker.isTimerActive(InvokerActor.timerName) shouldBe false

invoker ! PingMessage(InvokerInstanceId(0, userMemory = defaultUserMemory), Some(false))
invoker.stateName shouldBe Offline
}

it should "initially store invoker status with its full id - instance/uniqueName/displayedName" in {
val invoker0 = TestProbe()
val children = mutable.Queue(invoker0.ref)
Expand Down