Skip to content

Fixes bug in invoker supervision on startup. #5050

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Jan 26, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -293,20 +293,12 @@ class InvokerActor(invokerInstance: InvokerInstanceId, controllerInstance: Contr

val healthyTimeout: FiniteDuration = 10.seconds

// This is done at this point to not intermingle with the state-machine
// especially their timeouts.
// This is done at this point to not intermingle with the state-machine especially their timeouts.
def customReceive: Receive = {
case _: RecordMetadata => // The response of putting testactions to the MessageProducer. We don't have to do anything with them.
case _: RecordMetadata => // Ignores the result of publishing test actions to MessageProducer.
}
override def receive: Receive = customReceive.orElse(super.receive)

/** Always start UnHealthy. Then the invoker receives some test activations and becomes Healthy. */
startWith(Unhealthy, InvokerInfo(new RingBuffer[InvocationFinishedResult](InvokerActor.bufferSize)))

/** An Offline invoker represents an existing but broken invoker. This means, that it does not send pings anymore. */
when(Offline) {
case Event(_: PingMessage, _) => goto(Unhealthy)
}
override def receive: Receive = customReceive.orElse(super.receive)

// To be used for all states that should send test actions to reverify the invoker
val healthPingingState: StateFunction = {
Expand All @@ -317,27 +309,43 @@ class InvokerActor(invokerInstance: InvokerInstanceId, controllerInstance: Contr
stay
}

// To be used for all states that should send test actions to reverify the invoker
def healthPingingTransitionHandler(state: InvokerState): TransitionHandler = {
case _ -> `state` =>
invokeTestAction()
setTimer(InvokerActor.timerName, Tick, 1.minute, repeat = true)
case `state` -> _ => cancelTimer(InvokerActor.timerName)
}

/** Always start UnHealthy. Then the invoker receives some test activations and becomes Healthy. */
startWith(Unhealthy, InvokerInfo(new RingBuffer[InvocationFinishedResult](InvokerActor.bufferSize)))

/** An Offline invoker represents an existing but broken invoker. This means, that it does not send pings anymore. */
when(Offline) {
case Event(_: PingMessage, _) => goto(Unhealthy)
}

/** An Unhealthy invoker represents an invoker that was not able to handle actions successfully. */
when(Unhealthy, stateTimeout = healthyTimeout)(healthPingingState)

/** An Unresponsive invoker represents an invoker that is not responding with active acks in a timely manner */
when(Unresponsive, stateTimeout = healthyTimeout)(healthPingingState)

/**
* A Healthy invoker is characterized by continuously getting pings. It will go offline if that state is not confirmed
* for 20 seconds.
* A Healthy invoker is characterized by continuously getting pings.
* It will go offline if that state is not confirmed for 20 seconds.
*/
when(Healthy, stateTimeout = healthyTimeout) {
case Event(_: PingMessage, _) => stay
case Event(StateTimeout, _) => goto(Offline)
}

/** Handle the completion of an Activation in every state. */
/** Handles the completion of an Activation in every state. */
whenUnhandled {
case Event(cm: InvocationFinishedMessage, info) => handleCompletionMessage(cm.result, info.buffer)
}

/** Logging on Transition change */
/** Logs transition changes. */
onTransition {
case _ -> newState if !newState.isUsable =>
transid.mark(
Expand All @@ -348,14 +356,6 @@ class InvokerActor(invokerInstance: InvokerInstanceId, controllerInstance: Contr
case _ -> newState if newState.isUsable => logging.info(this, s"$name is ${newState.asString}")
}

// To be used for all states that should send test actions to reverify the invoker
def healthPingingTransitionHandler(state: InvokerState): TransitionHandler = {
case _ -> `state` =>
invokeTestAction()
setTimer(InvokerActor.timerName, Tick, 1.minute, repeat = true)
case `state` -> _ => cancelTimer(InvokerActor.timerName)
}

onTransition(healthPingingTransitionHandler(Unhealthy))
onTransition(healthPingingTransitionHandler(Unresponsive))

Expand All @@ -372,28 +372,53 @@ class InvokerActor(invokerInstance: InvokerInstanceId, controllerInstance: Contr
buffer: RingBuffer[InvocationFinishedResult]) = {
buffer.add(result)

// If the action is successful it seems like the Invoker is Healthy again. So we execute immediately
// a new test action to remove the errors out of the RingBuffer as fast as possible.
// If the action is successful, the Invoker is Healthy. We execute additional test actions
// immediately to clear the RingBuffer as fast as possible.
// The actions that arrive while the invoker is unhealthy are most likely health actions.
// It is possible they are normal user actions as well. This can happen if such actions were in the
// invoker queue or in progress while the invoker's status flipped to Unhealthy.
if (result == InvocationFinishedResult.Success && stateName == Unhealthy) {
invokeTestAction()
}

// Stay in online if the activations was successful.
// Stay in offline, if an activeAck reaches the controller.
// Stay online if the activations was successful.
// Stay offline if an activeAck is received (a stale activation) but the invoker ceased pinging.
if ((stateName == Healthy && result == InvocationFinishedResult.Success) || stateName == Offline) {
stay
} else {
val entries = buffer.toList
// Goto Unhealthy or Unresponsive respectively if there are more errors than accepted in buffer, else goto Healthy

// Goto Unhealthy or Unresponsive respectively if there are more errors than accepted in buffer at steady state.
// Otherwise transition to Healthy on successful activations only.
if (entries.count(_ == InvocationFinishedResult.SystemError) > InvokerActor.bufferErrorTolerance) {
// Note: The predicate is false if the ring buffer is still being primed
// (i.e., the entries.size <= bufferErrorTolerance).
gotoIfNotThere(Unhealthy)
} else if (entries.count(_ == InvocationFinishedResult.Timeout) > InvokerActor.bufferErrorTolerance) {
// Note: The predicate is false if the ring buffer is still being primed
// (i.e., the entries.size <= bufferErrorTolerance).
gotoIfNotThere(Unresponsive)
} else {
gotoIfNotThere(Healthy)
result match {
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The changes above are comments and semantically preserving refactoring for clarity. The substance of the PR is in the following case statements on lines 409, 413 and 417.

case InvocationFinishedResult.Success =>
// Eagerly transition to healthy, at steady state (there aren't sufficient contra-indications) or
// during priming of the ring buffer. In case of the latter, there is at least one additional test
// action in flight which can reverse the transition later.
gotoIfNotThere(Healthy)

case InvocationFinishedResult.SystemError if (entries.size <= InvokerActor.bufferErrorTolerance) =>
// The ring buffer is not fully primed yet, stay/goto Unhealthy.
gotoIfNotThere(Unhealthy)

case InvocationFinishedResult.Timeout if (entries.size <= InvokerActor.bufferErrorTolerance) =>
// The ring buffer is not fully primed yet, stay/goto Unresponsive.
gotoIfNotThere(Unresponsive)

case _ =>
// At steady state, the state of the buffer superceded and we hold the current state
// until enough events have occured to transition to a new state.
stay
}
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -270,19 +270,21 @@ class ContainerPool(childFactory: ActorRefFactory => ActorRef,
freePool.get(sender()).foreach { f =>
freePool = freePool - sender()
}

// container was busy (busy indicates at full capacity), so there is capacity to accept another job request
busyPool.get(sender()).foreach { _ =>
busyPool = busyPool - sender()
}
processBufferOrFeed()

//in case this was a prewarm
// in case this was a prewarm
prewarmedPool.get(sender()).foreach { data =>
prewarmedPool = prewarmedPool - sender()
}
//in case this was a starting prewarm

// in case this was a starting prewarm
prewarmStartingPool.get(sender()).foreach { _ =>
logging.info(this, "failed starting prewarm removed")
logging.info(this, "failed starting prewarm, removed")
prewarmStartingPool = prewarmStartingPool - sender()
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -207,6 +207,54 @@ class InvokerSupervisionTests

behavior of "InvokerActor"

it should "start and stay unhealthy while min threshold is not met" in {
val invoker =
TestFSMRef(new InvokerActor(InvokerInstanceId(0, userMemory = defaultUserMemory), ControllerInstanceId("0")))
invoker.stateName shouldBe Unhealthy

(1 to InvokerActor.bufferErrorTolerance + 1).foreach { _ =>
invoker ! InvocationFinishedMessage(
InvokerInstanceId(0, userMemory = defaultUserMemory),
InvocationFinishedResult.SystemError)
invoker.stateName shouldBe Unhealthy
}

(1 to InvokerActor.bufferSize - InvokerActor.bufferErrorTolerance - 1).foreach { _ =>
invoker ! InvocationFinishedMessage(
InvokerInstanceId(0, userMemory = defaultUserMemory),
InvocationFinishedResult.Success)
invoker.stateName shouldBe Unhealthy
}

invoker ! InvocationFinishedMessage(
InvokerInstanceId(0, userMemory = defaultUserMemory),
InvocationFinishedResult.Success)
invoker.stateName shouldBe Healthy
}

it should "revert to unhealthy during initial startup if there is a failed test activation" in {
assume(InvokerActor.bufferErrorTolerance >= 3)

val invoker =
TestFSMRef(new InvokerActor(InvokerInstanceId(0, userMemory = defaultUserMemory), ControllerInstanceId("0")))
invoker.stateName shouldBe Unhealthy

invoker ! InvocationFinishedMessage(
InvokerInstanceId(0, userMemory = defaultUserMemory),
InvocationFinishedResult.SystemError)
invoker.stateName shouldBe Unhealthy

invoker ! InvocationFinishedMessage(
InvokerInstanceId(0, userMemory = defaultUserMemory),
InvocationFinishedResult.Success)
invoker.stateName shouldBe Healthy

invoker ! InvocationFinishedMessage(
InvokerInstanceId(0, userMemory = defaultUserMemory),
InvocationFinishedResult.SystemError)
invoker.stateName shouldBe Unhealthy
}

// unHealthy -> offline
// offline -> unhealthy
it should "start unhealthy, go offline if the state times out and goes unhealthy on a successful ping again" in {
Expand Down Expand Up @@ -318,7 +366,7 @@ class InvokerSupervisionTests
}
}

it should "start timer to send testactions when unhealthy" in {
it should "start timer to send test actions when unhealthy" in {
val invoker =
TestFSMRef(new InvokerActor(InvokerInstanceId(0, userMemory = defaultUserMemory), ControllerInstanceId("0")))
invoker.stateName shouldBe Unhealthy
Expand All @@ -337,7 +385,6 @@ class InvokerSupervisionTests
}

it should "initially store invoker status with its full id - instance/uniqueName/displayedName" in {

val invoker0 = TestProbe()
val children = mutable.Queue(invoker0.ref)
val childFactory = (f: ActorRefFactory, instance: InvokerInstanceId) => children.dequeue()
Expand Down