Skip to content

Handle container cleanup from ActivationClient shutdown gracefully #5348

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 5 commits into from
Nov 4, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
package org.apache.openwhisk.core.containerpool.v2

import akka.actor.Status.{Failure => FailureMessage}
import akka.actor.{ActorRef, ActorSystem, FSM, Props, Stash}
import akka.actor.{ActorSystem, FSM, Props, Stash}
import akka.grpc.internal.ClientClosedException
import akka.pattern.pipe
import io.grpc.StatusRuntimeException
Expand All @@ -36,7 +36,7 @@ import scala.concurrent.Future
import scala.util.{Success, Try}

// Event send by the actor
case class ClientCreationCompleted(client: Option[ActorRef] = None)
case object ClientCreationCompleted
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As we store the reference of a client proxy after creation, we don't need to pass/receive the client-proxy reference via this message. So it became an object.

https://github.com/apache/openwhisk/pull/5348/files#diff-23fc1c1634cd8a2e99b4cfbf342527248a53f5987911af80ab5d910ce7864d70R315

case object ClientClosed

// Event received by the actor
Expand Down Expand Up @@ -91,12 +91,14 @@ class ActivationClientProxy(
stay using r

case Event(client: ActivationClient, _) =>
context.parent ! ClientCreationCompleted()
context.parent ! ClientCreationCompleted

goto(ClientProxyReady) using Client(client.client, client.rpcHost, client.rpcPort)

case Event(f: FailureMessage, _) =>
logging.error(this, s"failed to create grpc client for ${action} caused by: $f")
context.parent ! f

self ! ClientClosed

goto(ClientProxyRemoving)
Expand Down Expand Up @@ -164,9 +166,12 @@ class ActivationClientProxy(
stay()

case _: ActionMismatch =>
logging.error(this, s"[${containerId.asString}] action version does not match: $action")
val errorMsg = s"[${containerId.asString}] action version does not match: $action"
logging.error(this, errorMsg)
c.activationClient.close().andThen {
case _ => self ! ClientClosed
case _ =>
Copy link
Contributor

@bdoyle0182 bdoyle0182 Nov 2, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I was also seeing the issue of orphaned data for a test action that I frequently update every minute outside of the scheduler deployments. Does it stand to reason this was really the same issue and this will also fix that issue?

The issue started happening I want to say around the same time of the October 13th commit that introduced the regression so it seems likely to me.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think so.
As I stated here, when the ActivationClientProxy initiates the clean-up process, it didn't let the ContainerProxy know. The ETCD data clean-up is handled by ContainerProxy, accordingly, the data was not removed.

context.parent ! FailureMessage(new RuntimeException(errorMsg))
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As GracefulShutdown is introduced in the ContainerProxy layer, generally a clean-up process is initiated by ContainerProxy. ContainerProxy cleans up ETCD data first and sends GracefulShutdown to ActivationClientPRoxy and waits for the ClientClosed message.
But in some cases like this, there is a need for ActivationClientProxy to initiate the clean-up process.
In these cases, ActivationClientProxy should let ContainerProxy(parent) know the situation and let it start the clean-up process immediately while it also shut down.

So now, we need to send a FailureMessage to the parent, then the parent will clean up ETCD data and remove containers immediately by this kind of logic.
https://github.com/apache/openwhisk/pull/5348/files#diff-23fc1c1634cd8a2e99b4cfbf342527248a53f5987911af80ab5d910ce7864d70R368

self ! ClientClosed
}

goto(ClientProxyRemoving)
Expand Down Expand Up @@ -194,6 +199,7 @@ class ActivationClientProxy(
// it would print huge log due to create another grpcClient to fetch activation again.
case t: StatusRuntimeException if t.getMessage.contains(ActivationClientProxy.hostResolveError) =>
logging.error(this, s"[${containerId.asString}] akka grpc server connection failed: $t")
context.parent ! FailureMessage(t)
self ! ClientClosed

goto(ClientProxyRemoving)
Expand All @@ -208,14 +214,18 @@ class ActivationClientProxy(

stay()

case _: ClientClosedException =>
case t: ClientClosedException =>
logging.error(this, s"[${containerId.asString}] grpc client is already closed for $action")
context.parent ! FailureMessage(t)

self ! ClientClosed

goto(ClientProxyRemoving)

case t: Throwable =>
logging.error(this, s"[${containerId.asString}] get activation from remote server error: $t")
context.parent ! FailureMessage(t)

safelyCloseClient(c)
goto(ClientProxyRemoving)
}
Expand Down Expand Up @@ -372,7 +382,7 @@ class ActivationClientProxy(
logging.debug(this, s"grpc client is closed for $fqn in the Try closure")
Future.successful(ClientClosed)
}
.getOrElse(Future.failed(new Exception(s"error to get $fqn activation from grpc server")))
.getOrElse(Future.failed(new RuntimeException(s"error to get $fqn activation from grpc server")))
}

private def createActivationClient(invocationNamespace: String,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -272,8 +272,7 @@ class FunctionPullingContainerProxy(
job.rpcPort,
container.containerId)) match {
case Success(clientProxy) =>
clientProxy ! StartClient
Copy link
Member Author

@style95 style95 Nov 2, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Previously, this chaining of the future causes the timing issue.
As a result, we needed this case.

    case Event(ClientCreationCompleted(proxy), _: NonexistentData) =>
      akka.pattern.after(3.milliseconds, actorSystem.scheduler) {
        self ! ClientCreationCompleted(proxy.orElse(Some(sender())))
        Future.successful({})
      }

https://github.com/apache/openwhisk/pull/5348/files#diff-23fc1c1634cd8a2e99b4cfbf342527248a53f5987911af80ab5d910ce7864d70L341

It had made the logic indeterministic and less efficient, so I refactored it.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

great

ContainerCreatedData(container, job.invocationNamespace, job.action)
InitializedData(container, job.invocationNamespace, job.action, clientProxy)
case Failure(t) =>
logging.error(this, s"failed to create activation client caused by: $t")
ClientCreationFailed(t, container, job.invocationNamespace, job.action)
Expand Down Expand Up @@ -303,7 +302,7 @@ class FunctionPullingContainerProxy(
// prewarmed state, container created
when(ContainerCreated) {
case Event(job: Initialize, data: PreWarmData) =>
Try(
val res = Try(
clientProxyFactory(
context,
job.invocationNamespace,
Expand All @@ -313,13 +312,15 @@ class FunctionPullingContainerProxy(
job.rpcPort,
data.container.containerId)) match {
case Success(proxy) =>
proxy ! StartClient
InitializedData(data.container, job.invocationNamespace, job.action, proxy)
case Failure(t) =>
logging.error(this, s"failed to create activation client for ${job.action} caused by: $t")
self ! ClientCreationFailed(t, data.container, job.invocationNamespace, job.action)
ClientCreationFailed(t, data.container, job.invocationNamespace, job.action)
}

goto(CreatingClient) using ContainerCreatedData(data.container, job.invocationNamespace, job.action)
self ! res

goto(CreatingClient)

case Event(Remove, data: PreWarmData) =>
cleanUp(data.container, None, false)
Expand All @@ -334,41 +335,27 @@ class FunctionPullingContainerProxy(

when(CreatingClient) {
// wait for client creation when cold start
case Event(job: ContainerCreatedData, _: NonexistentData) =>
stay() using job
case Event(job: InitializedData, _) =>
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Now, it sends the StartClient to the ActivationClientProxy only after it receives the InitializedData.
So there would be no timing issue.

job.clientProxy ! StartClient

// wait for container creation when cold start
case Event(ClientCreationCompleted(proxy), _: NonexistentData) =>
akka.pattern.after(3.milliseconds, actorSystem.scheduler) {
self ! ClientCreationCompleted(proxy.orElse(Some(sender())))
Future.successful({})
}

stay()
stay() using job

// client was successfully obtained
case Event(ClientCreationCompleted(proxy), data: ContainerCreatedData) =>
val clientProxy = proxy.getOrElse(sender())
case Event(ClientCreationCompleted, data: InitializedData) =>
val fqn = data.action.fullyQualifiedName(true)
val revision = data.action.rev
dataManagementService ! RegisterData(
s"${ContainerKeys.existingContainers(data.invocationNamespace, fqn, revision, Some(instance), Some(data.container.containerId))}",
"")
self ! InitializedData(data.container, data.invocationNamespace, data.action, clientProxy)
self ! data
goto(ClientCreated)

// client creation failed
case Event(t: ClientCreationFailed, _) =>
invokerHealthManager ! HealthMessage(state = false)
cleanUp(t.container, t.invocationNamespace, t.action.fullyQualifiedName(withVersion = true), t.action.rev, None)

// there can be a case that client create is failed and a ClientClosed will be sent by ActivationClientProxy
// wait for container creation when cold start
case Event(ClientClosed, _: NonexistentData) =>
self ! ClientClosed
stay()

case Event(ClientClosed, data: ContainerCreatedData) =>
case Event(ClientClosed, data: InitializedData) =>
invokerHealthManager ! HealthMessage(state = false)
cleanUp(
data.container,
Expand All @@ -378,7 +365,7 @@ class FunctionPullingContainerProxy(
None)

// container creation failed when cold start
case Event(t: FailureMessage, _) =>
case Event(_: FailureMessage, _) =>
context.parent ! ContainerRemoved(true)
stop()

Expand Down Expand Up @@ -518,6 +505,8 @@ class FunctionPullingContainerProxy(
data.action.fullyQualifiedName(withVersion = true),
data.action.rev,
Some(data.clientProxy))

case x: Event if x.event != PingCache => delay
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This will make sure GracefulShutdown is properly handled in all states.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

My concern with having a stash message for all events and then unstash it on state transition, is that it was the root cause of the edge case with the orphaned etcd data for pausing / unpausing containers where a generic FailureMessage would be stashed and then when it transitioned to Running it would unstash the FailureMessage leading to the bug / unexpected behavior.

I just want to make sure that we're 100% sure a catch all here will not have any unknown side effects.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This state is a temporary state before removing containers and the proxy itself.
So the next state is always Removing as all cases in this state lead to it and both cases clean up etcd data properly.

In the Removing state, there is no case of moving back to other states.
It stays or stops.

  when(Removing, unusedTimeout) {
    // only if ClientProxy is closed, ContainerProxy stops. So it is important for ClientProxy to send ClientClosed.
    case Event(ClientClosed, _) =>
      stop()

    // even if any error occurs, it still waits for ClientClosed event in order to be stopped after the client is closed.
    case Event(t: FailureMessage, _) =>
      logging.error(this, s"unable to delete a container due to ${t}")

      stay

    case Event(StateTimeout, _) =>
      logging.error(this, s"could not receive ClientClosed for ${unusedTimeout}, so just stop the container proxy.")

      stop()

    case Event(Remove | GracefulShutdown, _) =>
      stay()

    case Event(DetermineKeepContainer(_), _) =>
      stay()
  }

So I suppose it would be OK to delay.

}

when(Running) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -247,7 +247,7 @@ case class HealthActivationServiceClient() extends Actor {
private var closed: Boolean = false

override def receive: Receive = {
case StartClient => sender() ! ClientCreationCompleted()
case StartClient => sender() ! ClientCreationCompleted
case _: RequestActivation =>
InvokerHealthManager.healthActivation match {
case Some(activation) if !closed =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,9 @@ class SchedulingDecisionMaker(
this,
s"there is no capacity activations will be dropped or throttled, (availableMsg: $availableMsg totalContainers: $totalContainers, limit: $limit, namespaceContainers: ${existingContainerCountInNs}, namespaceInProgressContainer: ${inProgressContainerCountInNs}) [$invocationNamespace:$action]")
Future.successful(DecisionResults(EnableNamespaceThrottling(dropMsg = totalContainers == 0), 0))
case NamespaceThrottled if schedulingConfig.allowOverProvisionBeforeThrottle && ceiling(limit * schedulingConfig.namespaceOverProvisionBeforeThrottleRatio) - existingContainerCountInNs - inProgressContainerCountInNs > 0 =>
case NamespaceThrottled
if schedulingConfig.allowOverProvisionBeforeThrottle && ceiling(
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It seems our CI tests do not run checkScalaFmtAll.
Some codes already included in the master branch are incorrectly formatted.

limit * schedulingConfig.namespaceOverProvisionBeforeThrottleRatio) - existingContainerCountInNs - inProgressContainerCountInNs > 0 =>
Future.successful(DecisionResults(DisableNamespaceThrottling, 0))
// do nothing
case _ =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ package org.apache.openwhisk.core.containerpool.v2.test

import akka.Done
import akka.actor.FSM.{CurrentState, SubscribeTransitionCallBack, Transition}
import akka.actor.Status.Failure
import akka.actor.{ActorRef, ActorSystem}
import akka.grpc.internal.ClientClosedException
import akka.testkit.{ImplicitSender, TestKit, TestProbe}
Expand Down Expand Up @@ -103,7 +104,7 @@ class ActivationClientProxyTests

machine ! StartClient

probe.expectMsg(ClientCreationCompleted())
probe.expectMsg(ClientCreationCompleted)
probe.expectMsg(Transition(machine, ClientProxyUninitialized, ClientProxyReady))
}

Expand All @@ -124,6 +125,9 @@ class ActivationClientProxyTests

machine ! StartClient

probe.expectMsgPF() {
case Failure(t) => t.getMessage shouldBe "The number of client creation retries has been exceeded."
}
probe.expectMsg(Transition(machine, ClientProxyUninitialized, ClientProxyRemoving))
probe.expectMsg(ClientClosed)

Expand Down Expand Up @@ -208,7 +212,14 @@ class ActivationClientProxyTests
ready(machine, probe)

machine ! RequestActivation()
probe.expectMsg(Transition(machine, ClientProxyReady, ClientProxyRemoving))

inAnyOrder {
probe.expectMsg(Transition(machine, ClientProxyReady, ClientProxyRemoving))
probe.expectMsgPF() {
case Failure(t) => t.getMessage.contains(s"action version does not match") shouldBe true
}
}

probe.expectMsg(ClientClosed)

probe expectTerminated machine
Expand Down Expand Up @@ -319,7 +330,11 @@ class ActivationClientProxyTests
ready(machine, probe)

machine ! RequestActivation()
probe.expectMsgPF() {
case Failure(t) => t.isInstanceOf[ClientClosedException] shouldBe true
}
probe.expectMsg(Transition(machine, ClientProxyReady, ClientProxyRemoving))

probe.expectMsg(ClientClosed)

probe expectTerminated machine
Expand All @@ -343,6 +358,9 @@ class ActivationClientProxyTests
ready(machine, probe)

machine ! RequestActivation()
probe.expectMsgPF() {
case Failure(t) => t.getMessage.contains("Unknown exception") shouldBe true
}
probe.expectMsg(Transition(machine, ClientProxyReady, ClientProxyRemoving))
probe.expectMsg(ClientClosed)

Expand Down Expand Up @@ -426,7 +444,7 @@ class ActivationClientProxyTests

def ready(machine: ActorRef, probe: TestProbe) = {
machine ! StartClient
probe.expectMsg(ClientCreationCompleted())
probe.expectMsg(ClientCreationCompleted)
probe.expectMsg(Transition(machine, ClientProxyUninitialized, ClientProxyReady))
}

Expand Down
Loading