Skip to content

add enable/disable invoker support to old scheduler #5205

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,8 @@ class DefaultInvokerServer(val invoker: InvokerCore, systemUsername: String, sys
invoker.enable()
} ~ (path("disable") & post) {
invoker.disable()
} ~ (path("isEnabled") & get) {
invoker.isEnabled()
}
case _ => terminate(StatusCodes.Unauthorized)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,11 +34,12 @@ import org.apache.openwhisk.core.containerpool.logging.LogStoreProvider
import org.apache.openwhisk.core.containerpool.v2._
import org.apache.openwhisk.core.database.{UserContext, _}
import org.apache.openwhisk.core.entity._
import org.apache.openwhisk.core.etcd.EtcdKV.ContainerKeys.{containerPrefix}
import org.apache.openwhisk.core.etcd.EtcdKV.ContainerKeys.containerPrefix
import org.apache.openwhisk.core.etcd.EtcdKV.QueueKeys.queue
import org.apache.openwhisk.core.etcd.EtcdKV.{ContainerKeys, SchedulerKeys}
import org.apache.openwhisk.core.etcd.EtcdType._
import org.apache.openwhisk.core.etcd.{EtcdClient, EtcdConfig}
import org.apache.openwhisk.core.invoker.Invoker.InvokerEnabled
import org.apache.openwhisk.core.scheduler.{SchedulerEndpoints, SchedulerStates}
import org.apache.openwhisk.core.service.{DataManagementService, EtcdWorker, LeaseKeepAliveService, WatcherService}
import org.apache.openwhisk.core.{ConfigKeys, WarmUp, WhiskConfig}
Expand Down Expand Up @@ -402,6 +403,10 @@ class FPCInvokerReactive(config: WhiskConfig,
complete("Successfully disabled invoker")
}

override def isEnabled(): Route = {
complete(InvokerEnabled(consumer.nonEmpty && warmUpWatcher.nonEmpty).toJson().compactPrint)
}

override def backfillPrewarm(): Route = {
pool ! AdjustPrewarmedContainer
complete("backfilling prewarm container")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,8 @@ class FPCInvokerServer(val invoker: InvokerCore, systemUsername: String, systemP
invoker.enable()
} ~ (path("disable") & post) {
invoker.disable()
} ~ (path("isEnabled") & get) {
invoker.isEnabled()
}
case _ => terminate(StatusCodes.Unauthorized)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ import org.apache.openwhisk.spi.{Spi, SpiLoader}
import org.apache.openwhisk.utils.ExecutionContextFactory
import pureconfig._
import pureconfig.generic.auto._
import spray.json.{JsBoolean, JsObject}

import scala.concurrent.duration._
import scala.concurrent.{Await, ExecutionContext, Future}
Expand Down Expand Up @@ -74,6 +75,10 @@ object Invoker {

val topicPrefix = loadConfigOrThrow[String](ConfigKeys.kafkaTopicsPrefix)

case class InvokerEnabled(isEnabled: Boolean) {
def toJson(): JsObject = JsObject("enabled" -> JsBoolean(isEnabled))
}

/**
* An object which records the environment variables required for this component to run.
*/
Expand Down Expand Up @@ -220,6 +225,7 @@ trait InvokerProvider extends Spi {
trait InvokerCore {
def enable(): Route
def disable(): Route
def isEnabled(): Route
def backfillPrewarm(): Route
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,8 @@ package org.apache.openwhisk.core.invoker

import java.nio.charset.StandardCharsets
import java.time.Instant

import akka.Done
import akka.actor.{ActorRefFactory, ActorSystem, CoordinatedShutdown, Props}
import akka.actor.{ActorRef, ActorRefFactory, ActorSystem, CoordinatedShutdown, Props}
import akka.event.Logging.InfoLevel
import akka.http.scaladsl.server.Directives.complete
import akka.http.scaladsl.server.Route
Expand All @@ -34,6 +33,7 @@ import org.apache.openwhisk.core.containerpool.logging.LogStoreProvider
import org.apache.openwhisk.core.database.{UserContext, _}
import org.apache.openwhisk.core.entity._
import org.apache.openwhisk.core.entity.size._
import org.apache.openwhisk.core.invoker.Invoker.InvokerEnabled
import org.apache.openwhisk.core.{ConfigKeys, WhiskConfig}
import org.apache.openwhisk.http.Messages
import org.apache.openwhisk.spi.SpiLoader
Expand All @@ -46,7 +46,6 @@ import scala.concurrent.{ExecutionContext, Future}
import scala.util.{Failure, Success}

object InvokerReactive extends InvokerProvider {

override def instance(
config: WhiskConfig,
instance: InvokerInstanceId,
Expand Down Expand Up @@ -293,18 +292,38 @@ class InvokerReactive(
}

private val healthProducer = msgProvider.getProducer(config)
Scheduler.scheduleWaitAtMost(1.seconds)(() => {
healthProducer.send(s"${Invoker.topicPrefix}health", PingMessage(instance)).andThen {
case Failure(t) => logging.error(this, s"failed to ping the controller: $t")
}
})

def getHealthScheduler: ActorRef =
Scheduler.scheduleWaitAtMost(1.seconds)(() => {
healthProducer.send(s"${Invoker.topicPrefix}health", PingMessage(instance)).andThen {
case Failure(t) => logging.error(this, s"failed to ping the controller: $t")
}
})

private var healthScheduler: Option[ActorRef] = Some(getHealthScheduler)

override def enable(): Route = {
if (healthScheduler.isEmpty) {
healthScheduler = Some(getHealthScheduler)
complete(s"${instance.toString} is now enabled.")
} else {
complete(s"${instance.toString} is already enabled.")
}
complete("not supported")
}

override def disable(): Route = {
complete("not supported")
if (healthScheduler.nonEmpty) {
actorSystem.stop(healthScheduler.get)
healthScheduler = None
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

JFYI.
While it would be effective, invokers in my downstream are sending a certain message to controllers so that they can make invokers unhealthy immediately to avoid a long-running activation coming to a disabled invoker during the timeout.

But I think this is just a minor tweak.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is that on the old scheduler for your downstream or a part of the new scheduler?

If on the old scheduler, I could do that as a follow up since it would require a change to the message bus and logic in the controller to read different health pings off the health topic. But think it's more of an optimization since the controller should recognize it as offline within ten seconds of disabling.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since the new scheduler relies on ETCD for health, the logic resides in the old scheduler.

I looked into the downstream code and realize that we just did a workaround to minimize the change by using an instance ID that is less than 0.
I think this would not be an official way but just a workaround.

complete(s"${instance.toString} is now disabled.")
} else {
complete(s"${instance.toString} is already disabled.")
}
}

override def isEnabled(): Route = {
complete(InvokerEnabled(healthScheduler.nonEmpty).toJson().compactPrint)
}

override def backfillPrewarm(): Route = {
Expand Down