Skip to content

Commit 3b6d07a

Browse files
bdoyle0182Brendan Doyle
andauthored
add enable/disable invoker support to old scheduler (#5205)
* add enable/disable invoker support to old scheduler and add is enabled route for invoker * feedback * fix enable complete * fail fast invoker to offline with updated ping message * test compilation * add tests * fix tests Co-authored-by: Brendan Doyle <[email protected]>
1 parent 5332e6d commit 3b6d07a

File tree

10 files changed

+137
-16
lines changed

10 files changed

+137
-16
lines changed

common/scala/src/main/scala/org/apache/openwhisk/core/connector/Message.scala

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -278,14 +278,16 @@ object AcknowledegmentMessage extends DefaultJsonProtocol {
278278
}
279279
}
280280

281-
case class PingMessage(instance: InvokerInstanceId) extends Message {
281+
case class PingMessage(instance: InvokerInstanceId, isEnabled: Option[Boolean] = None) extends Message {
282282
override def serialize = PingMessage.serdes.write(this).compactPrint
283+
284+
def invokerEnabled: Boolean = isEnabled.getOrElse(true)
283285
}
284286

285287
object PingMessage extends DefaultJsonProtocol {
286288
def parse(msg: String) = Try(serdes.read(msg.parseJson))
287289

288-
implicit val serdes = jsonFormat(PingMessage.apply _, "name")
290+
implicit val serdes = jsonFormat(PingMessage.apply, "name", "isEnabled")
289291
}
290292

291293
trait EventMessageBody extends Message {

core/controller/src/main/scala/org/apache/openwhisk/core/loadBalancer/InvokerSupervision.scala

Lines changed: 15 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -280,8 +280,8 @@ class InvokerActor(invokerInstance: InvokerInstanceId, controllerInstance: Contr
280280

281281
// To be used for all states that should send test actions to reverify the invoker
282282
val healthPingingState: StateFunction = {
283-
case Event(_: PingMessage, _) => stay
284-
case Event(StateTimeout, _) => goto(Offline)
283+
case Event(ping: PingMessage, _) => goOfflineIfDisabled(ping)
284+
case Event(StateTimeout, _) => goto(Offline)
285285
case Event(Tick, _) =>
286286
invokeTestAction()
287287
stay
@@ -300,7 +300,7 @@ class InvokerActor(invokerInstance: InvokerInstanceId, controllerInstance: Contr
300300

301301
/** An Offline invoker represents an existing but broken invoker. This means, that it does not send pings anymore. */
302302
when(Offline) {
303-
case Event(_: PingMessage, _) => goto(Unhealthy)
303+
case Event(ping: PingMessage, _) => if (ping.invokerEnabled) goto(Unhealthy) else stay
304304
}
305305

306306
/** An Unhealthy invoker represents an invoker that was not able to handle actions successfully. */
@@ -314,8 +314,8 @@ class InvokerActor(invokerInstance: InvokerInstanceId, controllerInstance: Contr
314314
* It will go offline if that state is not confirmed for 20 seconds.
315315
*/
316316
when(Healthy, stateTimeout = healthyTimeout) {
317-
case Event(_: PingMessage, _) => stay
318-
case Event(StateTimeout, _) => goto(Offline)
317+
case Event(ping: PingMessage, _) => goOfflineIfDisabled(ping)
318+
case Event(StateTimeout, _) => goto(Offline)
319319
}
320320

321321
/** Handles the completion of an Activation in every state. */
@@ -339,6 +339,16 @@ class InvokerActor(invokerInstance: InvokerInstanceId, controllerInstance: Contr
339339

340340
initialize()
341341

342+
/**
343+
* Handling for if a ping message from an invoker signals that it has been disabled to immediately transition to Offline.
344+
*
345+
* @param ping
346+
* @return
347+
*/
348+
private def goOfflineIfDisabled(ping: PingMessage) = {
349+
if (ping.invokerEnabled) stay else goto(Offline)
350+
}
351+
342352
/**
343353
* Handling for active acks. This method saves the result (successful or unsuccessful)
344354
* into an RingBuffer and checks, if the InvokerActor has to be changed to UnHealthy.

core/invoker/src/main/scala/org/apache/openwhisk/core/invoker/DefaultInvokerServer.scala

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -47,6 +47,8 @@ class DefaultInvokerServer(val invoker: InvokerCore, systemUsername: String, sys
4747
invoker.enable()
4848
} ~ (path("disable") & post) {
4949
invoker.disable()
50+
} ~ (path("isEnabled") & get) {
51+
invoker.isEnabled()
5052
}
5153
case _ => terminate(StatusCodes.Unauthorized)
5254
}

core/invoker/src/main/scala/org/apache/openwhisk/core/invoker/FPCInvokerReactive.scala

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -34,11 +34,12 @@ import org.apache.openwhisk.core.containerpool.logging.LogStoreProvider
3434
import org.apache.openwhisk.core.containerpool.v2._
3535
import org.apache.openwhisk.core.database.{UserContext, _}
3636
import org.apache.openwhisk.core.entity._
37-
import org.apache.openwhisk.core.etcd.EtcdKV.ContainerKeys.{containerPrefix}
37+
import org.apache.openwhisk.core.etcd.EtcdKV.ContainerKeys.containerPrefix
3838
import org.apache.openwhisk.core.etcd.EtcdKV.QueueKeys.queue
3939
import org.apache.openwhisk.core.etcd.EtcdKV.{ContainerKeys, SchedulerKeys}
4040
import org.apache.openwhisk.core.etcd.EtcdType._
4141
import org.apache.openwhisk.core.etcd.{EtcdClient, EtcdConfig}
42+
import org.apache.openwhisk.core.invoker.Invoker.InvokerEnabled
4243
import org.apache.openwhisk.core.scheduler.{SchedulerEndpoints, SchedulerStates}
4344
import org.apache.openwhisk.core.service.{DataManagementService, EtcdWorker, LeaseKeepAliveService, WatcherService}
4445
import org.apache.openwhisk.core.{ConfigKeys, WarmUp, WhiskConfig}
@@ -402,6 +403,10 @@ class FPCInvokerReactive(config: WhiskConfig,
402403
complete("Successfully disabled invoker")
403404
}
404405

406+
override def isEnabled(): Route = {
407+
complete(InvokerEnabled(consumer.nonEmpty && warmUpWatcher.nonEmpty).serialize())
408+
}
409+
405410
override def backfillPrewarm(): Route = {
406411
pool ! AdjustPrewarmedContainer
407412
complete("backfilling prewarm container")

core/invoker/src/main/scala/org/apache/openwhisk/core/invoker/FPCInvokerServer.scala

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -47,6 +47,8 @@ class FPCInvokerServer(val invoker: InvokerCore, systemUsername: String, systemP
4747
invoker.enable()
4848
} ~ (path("disable") & post) {
4949
invoker.disable()
50+
} ~ (path("isEnabled") & get) {
51+
invoker.isEnabled()
5052
}
5153
case _ => terminate(StatusCodes.Unauthorized)
5254
}

core/invoker/src/main/scala/org/apache/openwhisk/core/invoker/Invoker.scala

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,7 @@ import org.apache.openwhisk.spi.{Spi, SpiLoader}
3535
import org.apache.openwhisk.utils.ExecutionContextFactory
3636
import pureconfig._
3737
import pureconfig.generic.auto._
38+
import spray.json._
3839

3940
import scala.concurrent.duration._
4041
import scala.concurrent.{Await, ExecutionContext, Future}
@@ -74,6 +75,15 @@ object Invoker {
7475

7576
val topicPrefix = loadConfigOrThrow[String](ConfigKeys.kafkaTopicsPrefix)
7677

78+
object InvokerEnabled extends DefaultJsonProtocol {
79+
def parseJson(string: String) = Try(serdes.read(string.parseJson))
80+
implicit val serdes = jsonFormat(InvokerEnabled.apply _, "enabled")
81+
}
82+
83+
case class InvokerEnabled(isEnabled: Boolean) {
84+
def serialize(): String = InvokerEnabled.serdes.write(this).compactPrint
85+
}
86+
7787
/**
7888
* An object which records the environment variables required for this component to run.
7989
*/
@@ -220,6 +230,7 @@ trait InvokerProvider extends Spi {
220230
trait InvokerCore {
221231
def enable(): Route
222232
def disable(): Route
233+
def isEnabled(): Route
223234
def backfillPrewarm(): Route
224235
}
225236

core/invoker/src/main/scala/org/apache/openwhisk/core/invoker/InvokerReactive.scala

Lines changed: 29 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -19,9 +19,8 @@ package org.apache.openwhisk.core.invoker
1919

2020
import java.nio.charset.StandardCharsets
2121
import java.time.Instant
22-
2322
import akka.Done
24-
import akka.actor.{ActorRefFactory, ActorSystem, CoordinatedShutdown, Props}
23+
import akka.actor.{ActorRef, ActorRefFactory, ActorSystem, CoordinatedShutdown, Props}
2524
import akka.event.Logging.InfoLevel
2625
import akka.http.scaladsl.server.Directives.complete
2726
import akka.http.scaladsl.server.Route
@@ -34,6 +33,7 @@ import org.apache.openwhisk.core.containerpool.logging.LogStoreProvider
3433
import org.apache.openwhisk.core.database.{UserContext, _}
3534
import org.apache.openwhisk.core.entity._
3635
import org.apache.openwhisk.core.entity.size._
36+
import org.apache.openwhisk.core.invoker.Invoker.InvokerEnabled
3737
import org.apache.openwhisk.core.{ConfigKeys, WhiskConfig}
3838
import org.apache.openwhisk.http.Messages
3939
import org.apache.openwhisk.spi.SpiLoader
@@ -46,7 +46,6 @@ import scala.concurrent.{ExecutionContext, Future}
4646
import scala.util.{Failure, Success}
4747

4848
object InvokerReactive extends InvokerProvider {
49-
5049
override def instance(
5150
config: WhiskConfig,
5251
instance: InvokerInstanceId,
@@ -293,18 +292,40 @@ class InvokerReactive(
293292
}
294293

295294
private val healthProducer = msgProvider.getProducer(config)
296-
Scheduler.scheduleWaitAtMost(1.seconds)(() => {
297-
healthProducer.send(s"${Invoker.topicPrefix}health", PingMessage(instance)).andThen {
295+
296+
private def getHealthScheduler: ActorRef =
297+
Scheduler.scheduleWaitAtMost(1.seconds)(() => pingController(isEnabled = true))
298+
299+
private def pingController(isEnabled: Boolean) = {
300+
healthProducer.send(s"${Invoker.topicPrefix}health", PingMessage(instance, isEnabled = Some(isEnabled))).andThen {
298301
case Failure(t) => logging.error(this, s"failed to ping the controller: $t")
299302
}
300-
})
303+
}
304+
305+
private var healthScheduler: Option[ActorRef] = Some(getHealthScheduler)
301306

302307
override def enable(): Route = {
303-
complete("not supported")
308+
if (healthScheduler.isEmpty) {
309+
healthScheduler = Some(getHealthScheduler)
310+
complete(s"${instance.toString} is now enabled.")
311+
} else {
312+
complete(s"${instance.toString} is already enabled.")
313+
}
304314
}
305315

306316
override def disable(): Route = {
307-
complete("not supported")
317+
pingController(isEnabled = false)
318+
if (healthScheduler.nonEmpty) {
319+
actorSystem.stop(healthScheduler.get)
320+
healthScheduler = None
321+
complete(s"${instance.toString} is now disabled.")
322+
} else {
323+
complete(s"${instance.toString} is already disabled.")
324+
}
325+
}
326+
327+
override def isEnabled(): Route = {
328+
complete(InvokerEnabled(healthScheduler.nonEmpty).serialize())
308329
}
309330

310331
override def backfillPrewarm(): Route = {

tests/src/test/scala/org/apache/openwhisk/core/invoker/test/DefaultInvokerServerTests.scala

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,8 +21,10 @@ import akka.http.scaladsl.model.StatusCodes.{OK, Unauthorized}
2121
import akka.http.scaladsl.model.headers.BasicHttpCredentials
2222
import akka.http.scaladsl.server.Route
2323
import akka.http.scaladsl.testkit.ScalatestRouteTest
24+
import akka.http.scaladsl.unmarshalling.Unmarshal
2425
import common.StreamLogging
2526
import org.apache.openwhisk.common.TransactionId
27+
import org.apache.openwhisk.core.invoker.Invoker.InvokerEnabled
2628
import org.apache.openwhisk.core.invoker.{DefaultInvokerServer, InvokerCore}
2729
import org.apache.openwhisk.http.BasicHttpService
2830
import org.junit.runner.RunWith
@@ -76,6 +78,17 @@ class DefaultInvokerServerTests
7678
}
7779
}
7880

81+
it should "check if invoker is enabled" in {
82+
implicit val tid = transid()
83+
val validCredentials = BasicHttpCredentials(systemUsername, systemPassword)
84+
Get(s"/isEnabled") ~> addCredentials(validCredentials) ~> Route.seal(server.routes(tid)) ~> check {
85+
status should be(OK)
86+
Unmarshal(responseEntity).to[String].map(response => {
87+
InvokerEnabled.parseJson(response) shouldEqual InvokerEnabled(true)
88+
})
89+
}
90+
}
91+
7992
it should "not enable invoker with invalid credential" in {
8093
implicit val tid = transid()
8194
val invalidCredentials = BasicHttpCredentials("invaliduser", "invalidpass")
@@ -130,6 +143,10 @@ class TestInvokerReactive extends InvokerCore with BasicHttpService {
130143
complete("")
131144
}
132145

146+
override def isEnabled(): Route = {
147+
complete(InvokerEnabled(true).serialize())
148+
}
149+
133150
override def backfillPrewarm(): Route = {
134151
complete("")
135152
}

tests/src/test/scala/org/apache/openwhisk/core/invoker/test/FPCInvokerServerTests.scala

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,8 +21,10 @@ import akka.http.scaladsl.model.StatusCodes.{OK, Unauthorized}
2121
import akka.http.scaladsl.model.headers.BasicHttpCredentials
2222
import akka.http.scaladsl.server.Route
2323
import akka.http.scaladsl.testkit.ScalatestRouteTest
24+
import akka.http.scaladsl.unmarshalling.Unmarshal
2425
import common.StreamLogging
2526
import org.apache.openwhisk.common.TransactionId
27+
import org.apache.openwhisk.core.invoker.Invoker.InvokerEnabled
2628
import org.apache.openwhisk.core.invoker.{FPCInvokerServer, InvokerCore}
2729
import org.apache.openwhisk.http.BasicHttpService
2830
import org.junit.runner.RunWith
@@ -76,6 +78,17 @@ class FPCInvokerServerTests
7678
}
7779
}
7880

81+
it should "check if invoker is enabled" in {
82+
implicit val tid = transid()
83+
val validCredentials = BasicHttpCredentials(systemUsername, systemPassword)
84+
Get(s"/isEnabled") ~> addCredentials(validCredentials) ~> Route.seal(server.routes(tid)) ~> check {
85+
status should be(OK)
86+
Unmarshal(responseEntity).to[String].map(response => {
87+
InvokerEnabled.parseJson(response) shouldEqual InvokerEnabled(true)
88+
})
89+
}
90+
}
91+
7992
it should "not enable invoker with invalid credential" in {
8093
implicit val tid = transid()
8194
val invalidCredentials = BasicHttpCredentials("invaliduser", "invalidpass")
@@ -129,6 +142,10 @@ class TestFPCInvokerReactive extends InvokerCore with BasicHttpService {
129142
complete("")
130143
}
131144

145+
override def isEnabled(): Route = {
146+
complete(InvokerEnabled(true).serialize())
147+
}
148+
132149
override def backfillPrewarm(): Route = {
133150
complete("")
134151
}

tests/src/test/scala/org/apache/openwhisk/core/loadBalancer/test/InvokerSupervisionTests.scala

Lines changed: 34 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -364,6 +364,18 @@ class InvokerSupervisionTests
364364
}
365365
}
366366

367+
// unhealthy -> offline
368+
// offline -> off
369+
it should "go offline when unhealthy and disabled invoker ping received and stay offline if disabled ping received while offline" in {
370+
val invoker =
371+
TestFSMRef(new InvokerActor(InvokerInstanceId(0, userMemory = defaultUserMemory), ControllerInstanceId("0")))
372+
invoker.stateName shouldBe Unhealthy
373+
invoker ! PingMessage(InvokerInstanceId(0, userMemory = defaultUserMemory), Some(false))
374+
invoker.stateName shouldBe Offline
375+
invoker ! PingMessage(InvokerInstanceId(0, userMemory = defaultUserMemory), Some(false))
376+
invoker.stateName shouldBe Offline
377+
}
378+
367379
it should "start timer to send test actions when unhealthy" in {
368380
val invoker =
369381
TestFSMRef(new InvokerActor(InvokerInstanceId(0, userMemory = defaultUserMemory), ControllerInstanceId("0")))
@@ -382,6 +394,28 @@ class InvokerSupervisionTests
382394
invoker.isTimerActive(InvokerActor.timerName) shouldBe false
383395
}
384396

397+
// healthy -> offline
398+
it should "go offline from healthy immediately when disabled invoker ping received" in {
399+
val invoker =
400+
TestFSMRef(new InvokerActor(InvokerInstanceId(0, userMemory = defaultUserMemory), ControllerInstanceId("0")))
401+
invoker.stateName shouldBe Unhealthy
402+
403+
invoker.isTimerActive(InvokerActor.timerName) shouldBe true
404+
405+
// Fill buffer with successful invocations to become healthy again (one below errorTolerance)
406+
(1 to InvokerActor.bufferSize - InvokerActor.bufferErrorTolerance).foreach { _ =>
407+
invoker ! InvocationFinishedMessage(
408+
InvokerInstanceId(0, userMemory = defaultUserMemory),
409+
InvocationFinishedResult.Success)
410+
}
411+
invoker.stateName shouldBe Healthy
412+
413+
invoker.isTimerActive(InvokerActor.timerName) shouldBe false
414+
415+
invoker ! PingMessage(InvokerInstanceId(0, userMemory = defaultUserMemory), Some(false))
416+
invoker.stateName shouldBe Offline
417+
}
418+
385419
it should "initially store invoker status with its full id - instance/uniqueName/displayedName" in {
386420
val invoker0 = TestProbe()
387421
val children = mutable.Queue(invoker0.ref)

0 commit comments

Comments
 (0)