Skip to content

Commit 3f65d78

Browse files
committed
Fixes bug in invoker supervision on startup.
1 parent 4babe39 commit 3f65d78

File tree

3 files changed

+108
-34
lines changed

3 files changed

+108
-34
lines changed

core/controller/src/main/scala/org/apache/openwhisk/core/loadBalancer/InvokerSupervision.scala

Lines changed: 54 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -293,20 +293,12 @@ class InvokerActor(invokerInstance: InvokerInstanceId, controllerInstance: Contr
293293

294294
val healthyTimeout: FiniteDuration = 10.seconds
295295

296-
// This is done at this point to not intermingle with the state-machine
297-
// especially their timeouts.
296+
// This is done at this point to not intermingle with the state-machine especially their timeouts.
298297
def customReceive: Receive = {
299-
case _: RecordMetadata => // The response of putting testactions to the MessageProducer. We don't have to do anything with them.
298+
case _: RecordMetadata => // Ignores the result of publishing test actions to MessageProducer.
300299
}
301-
override def receive: Receive = customReceive.orElse(super.receive)
302300

303-
/** Always start UnHealthy. Then the invoker receives some test activations and becomes Healthy. */
304-
startWith(Unhealthy, InvokerInfo(new RingBuffer[InvocationFinishedResult](InvokerActor.bufferSize)))
305-
306-
/** An Offline invoker represents an existing but broken invoker. This means, that it does not send pings anymore. */
307-
when(Offline) {
308-
case Event(_: PingMessage, _) => goto(Unhealthy)
309-
}
301+
override def receive: Receive = customReceive.orElse(super.receive)
310302

311303
// To be used for all states that should send test actions to reverify the invoker
312304
val healthPingingState: StateFunction = {
@@ -317,27 +309,43 @@ class InvokerActor(invokerInstance: InvokerInstanceId, controllerInstance: Contr
317309
stay
318310
}
319311

312+
// To be used for all states that should send test actions to reverify the invoker
313+
def healthPingingTransitionHandler(state: InvokerState): TransitionHandler = {
314+
case _ -> `state` =>
315+
invokeTestAction()
316+
setTimer(InvokerActor.timerName, Tick, 1.minute, repeat = true)
317+
case `state` -> _ => cancelTimer(InvokerActor.timerName)
318+
}
319+
320+
/** Always start UnHealthy. Then the invoker receives some test activations and becomes Healthy. */
321+
startWith(Unhealthy, InvokerInfo(new RingBuffer[InvocationFinishedResult](InvokerActor.bufferSize)))
322+
323+
/** An Offline invoker represents an existing but broken invoker. This means, that it does not send pings anymore. */
324+
when(Offline) {
325+
case Event(_: PingMessage, _) => goto(Unhealthy)
326+
}
327+
320328
/** An Unhealthy invoker represents an invoker that was not able to handle actions successfully. */
321329
when(Unhealthy, stateTimeout = healthyTimeout)(healthPingingState)
322330

323331
/** An Unresponsive invoker represents an invoker that is not responding with active acks in a timely manner */
324332
when(Unresponsive, stateTimeout = healthyTimeout)(healthPingingState)
325333

326334
/**
327-
* A Healthy invoker is characterized by continuously getting pings. It will go offline if that state is not confirmed
328-
* for 20 seconds.
335+
* A Healthy invoker is characterized by continuously getting pings.
336+
* It will go offline if that state is not confirmed for 20 seconds.
329337
*/
330338
when(Healthy, stateTimeout = healthyTimeout) {
331339
case Event(_: PingMessage, _) => stay
332340
case Event(StateTimeout, _) => goto(Offline)
333341
}
334342

335-
/** Handle the completion of an Activation in every state. */
343+
/** Handles the completion of an Activation in every state. */
336344
whenUnhandled {
337345
case Event(cm: InvocationFinishedMessage, info) => handleCompletionMessage(cm.result, info.buffer)
338346
}
339347

340-
/** Logging on Transition change */
348+
/** Logs transition changes. */
341349
onTransition {
342350
case _ -> newState if !newState.isUsable =>
343351
transid.mark(
@@ -348,14 +356,6 @@ class InvokerActor(invokerInstance: InvokerInstanceId, controllerInstance: Contr
348356
case _ -> newState if newState.isUsable => logging.info(this, s"$name is ${newState.asString}")
349357
}
350358

351-
// To be used for all states that should send test actions to reverify the invoker
352-
def healthPingingTransitionHandler(state: InvokerState): TransitionHandler = {
353-
case _ -> `state` =>
354-
invokeTestAction()
355-
setTimer(InvokerActor.timerName, Tick, 1.minute, repeat = true)
356-
case `state` -> _ => cancelTimer(InvokerActor.timerName)
357-
}
358-
359359
onTransition(healthPingingTransitionHandler(Unhealthy))
360360
onTransition(healthPingingTransitionHandler(Unresponsive))
361361

@@ -372,28 +372,53 @@ class InvokerActor(invokerInstance: InvokerInstanceId, controllerInstance: Contr
372372
buffer: RingBuffer[InvocationFinishedResult]) = {
373373
buffer.add(result)
374374

375-
// If the action is successful it seems like the Invoker is Healthy again. So we execute immediately
376-
// a new test action to remove the errors out of the RingBuffer as fast as possible.
375+
// If the action is successful, the Invoker is Healthy. We execute additional test actions
376+
// immediately to clear the RingBuffer as fast as possible.
377377
// The actions that arrive while the invoker is unhealthy are most likely health actions.
378378
// It is possible they are normal user actions as well. This can happen if such actions were in the
379379
// invoker queue or in progress while the invoker's status flipped to Unhealthy.
380380
if (result == InvocationFinishedResult.Success && stateName == Unhealthy) {
381381
invokeTestAction()
382382
}
383383

384-
// Stay in online if the activations was successful.
385-
// Stay in offline, if an activeAck reaches the controller.
384+
// Stay online if the activations was successful.
385+
// Stay offline if an activeAck is received (a stale activation) but the invoker ceased pinging.
386386
if ((stateName == Healthy && result == InvocationFinishedResult.Success) || stateName == Offline) {
387387
stay
388388
} else {
389389
val entries = buffer.toList
390-
// Goto Unhealthy or Unresponsive respectively if there are more errors than accepted in buffer, else goto Healthy
390+
391+
// Goto Unhealthy or Unresponsive respectively if there are more errors than accepted in buffer at steady state.
392+
// Otherwise transition to Healthy on successful activations only.
391393
if (entries.count(_ == InvocationFinishedResult.SystemError) > InvokerActor.bufferErrorTolerance) {
394+
// Note: The predicate is false if the ring buffer is still being primed
395+
// (i.e., the entries.size <= bufferErrorTolerance).
392396
gotoIfNotThere(Unhealthy)
393397
} else if (entries.count(_ == InvocationFinishedResult.Timeout) > InvokerActor.bufferErrorTolerance) {
398+
// Note: The predicate is false if the ring buffer is still being primed
399+
// (i.e., the entries.size <= bufferErrorTolerance).
394400
gotoIfNotThere(Unresponsive)
395401
} else {
396-
gotoIfNotThere(Healthy)
402+
result match {
403+
case InvocationFinishedResult.Success =>
404+
// Eagerly transition to healthy, at steady state (there aren't sufficient contra-indications) or
405+
// during priming of the ring buffer. In case of the latter, there is at least one additional test
406+
// action in flight which can reverse the transition later.
407+
gotoIfNotThere(Healthy)
408+
409+
case InvocationFinishedResult.SystemError if (entries.size <= InvokerActor.bufferErrorTolerance) =>
410+
// The ring buffer is not fully primed yet, stay/goto Unhealthy
411+
gotoIfNotThere(Unhealthy)
412+
413+
case InvocationFinishedResult.Timeout if (entries.size <= InvokerActor.bufferErrorTolerance) =>
414+
// The ring buffer is not fully primed yet, stay/goto Unresponsive
415+
gotoIfNotThere(Unresponsive)
416+
417+
case _ =>
418+
// at steady state, the state of the buffer superceded and we hold the current state
419+
// until enough events have occured to transition to a new state
420+
stay
421+
}
397422
}
398423
}
399424
}

core/invoker/src/main/scala/org/apache/openwhisk/core/containerpool/ContainerPool.scala

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -267,19 +267,21 @@ class ContainerPool(childFactory: ActorRefFactory => ActorRef,
267267
freePool.get(sender()).foreach { f =>
268268
freePool = freePool - sender()
269269
}
270+
270271
// container was busy (busy indicates at full capacity), so there is capacity to accept another job request
271272
busyPool.get(sender()).foreach { _ =>
272273
busyPool = busyPool - sender()
273274
}
274275
processBufferOrFeed()
275276

276-
//in case this was a prewarm
277+
// in case this was a prewarm
277278
prewarmedPool.get(sender()).foreach { data =>
278279
prewarmedPool = prewarmedPool - sender()
279280
}
280-
//in case this was a starting prewarm
281+
282+
// in case this was a starting prewarm
281283
prewarmStartingPool.get(sender()).foreach { _ =>
282-
logging.info(this, "failed starting prewarm removed")
284+
logging.info(this, "failed starting prewarm, removed")
283285
prewarmStartingPool = prewarmStartingPool - sender()
284286
}
285287

tests/src/test/scala/org/apache/openwhisk/core/loadBalancer/test/InvokerSupervisionTests.scala

Lines changed: 49 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -207,6 +207,54 @@ class InvokerSupervisionTests
207207

208208
behavior of "InvokerActor"
209209

210+
it should "start and stay unhealthy while min threshold is not met" in {
211+
val invoker =
212+
TestFSMRef(new InvokerActor(InvokerInstanceId(0, userMemory = defaultUserMemory), ControllerInstanceId("0")))
213+
invoker.stateName shouldBe Unhealthy
214+
215+
(1 to InvokerActor.bufferErrorTolerance + 1).foreach { _ =>
216+
invoker ! InvocationFinishedMessage(
217+
InvokerInstanceId(0, userMemory = defaultUserMemory),
218+
InvocationFinishedResult.SystemError)
219+
invoker.stateName shouldBe Unhealthy
220+
}
221+
222+
(1 to InvokerActor.bufferSize - InvokerActor.bufferErrorTolerance - 1).foreach { _ =>
223+
invoker ! InvocationFinishedMessage(
224+
InvokerInstanceId(0, userMemory = defaultUserMemory),
225+
InvocationFinishedResult.Success)
226+
invoker.stateName shouldBe Unhealthy
227+
}
228+
229+
invoker ! InvocationFinishedMessage(
230+
InvokerInstanceId(0, userMemory = defaultUserMemory),
231+
InvocationFinishedResult.Success)
232+
invoker.stateName shouldBe Healthy
233+
}
234+
235+
it should "revert to unhealthy during initial startup if there is a failed test activation" in {
236+
assume(InvokerActor.bufferErrorTolerance >= 3)
237+
238+
val invoker =
239+
TestFSMRef(new InvokerActor(InvokerInstanceId(0, userMemory = defaultUserMemory), ControllerInstanceId("0")))
240+
invoker.stateName shouldBe Unhealthy
241+
242+
invoker ! InvocationFinishedMessage(
243+
InvokerInstanceId(0, userMemory = defaultUserMemory),
244+
InvocationFinishedResult.SystemError)
245+
invoker.stateName shouldBe Unhealthy
246+
247+
invoker ! InvocationFinishedMessage(
248+
InvokerInstanceId(0, userMemory = defaultUserMemory),
249+
InvocationFinishedResult.Success)
250+
invoker.stateName shouldBe Healthy
251+
252+
invoker ! InvocationFinishedMessage(
253+
InvokerInstanceId(0, userMemory = defaultUserMemory),
254+
InvocationFinishedResult.SystemError)
255+
invoker.stateName shouldBe Unhealthy
256+
}
257+
210258
// unHealthy -> offline
211259
// offline -> unhealthy
212260
it should "start unhealthy, go offline if the state times out and goes unhealthy on a successful ping again" in {
@@ -318,7 +366,7 @@ class InvokerSupervisionTests
318366
}
319367
}
320368

321-
it should "start timer to send testactions when unhealthy" in {
369+
it should "start timer to send test actions when unhealthy" in {
322370
val invoker =
323371
TestFSMRef(new InvokerActor(InvokerInstanceId(0, userMemory = defaultUserMemory), ControllerInstanceId("0")))
324372
invoker.stateName shouldBe Unhealthy
@@ -337,7 +385,6 @@ class InvokerSupervisionTests
337385
}
338386

339387
it should "initially store invoker status with its full id - instance/uniqueName/displayedName" in {
340-
341388
val invoker0 = TestProbe()
342389
val children = mutable.Queue(invoker0.ref)
343390
val childFactory = (f: ActorRefFactory, instance: InvokerInstanceId) => children.dequeue()

0 commit comments

Comments
 (0)