-
Notifications
You must be signed in to change notification settings - Fork 1.2k
Handle container cleanup from ActivationClient shutdown gracefully #5348
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -18,7 +18,7 @@ | |
package org.apache.openwhisk.core.containerpool.v2 | ||
|
||
import akka.actor.Status.{Failure => FailureMessage} | ||
import akka.actor.{ActorRef, ActorSystem, FSM, Props, Stash} | ||
import akka.actor.{ActorSystem, FSM, Props, Stash} | ||
import akka.grpc.internal.ClientClosedException | ||
import akka.pattern.pipe | ||
import io.grpc.StatusRuntimeException | ||
|
@@ -36,7 +36,7 @@ import scala.concurrent.Future | |
import scala.util.{Success, Try} | ||
|
||
// Event send by the actor | ||
case class ClientCreationCompleted(client: Option[ActorRef] = None) | ||
case object ClientCreationCompleted | ||
case object ClientClosed | ||
|
||
// Event received by the actor | ||
|
@@ -91,12 +91,14 @@ class ActivationClientProxy( | |
stay using r | ||
|
||
case Event(client: ActivationClient, _) => | ||
context.parent ! ClientCreationCompleted() | ||
context.parent ! ClientCreationCompleted | ||
|
||
goto(ClientProxyReady) using Client(client.client, client.rpcHost, client.rpcPort) | ||
|
||
case Event(f: FailureMessage, _) => | ||
logging.error(this, s"failed to create grpc client for ${action} caused by: $f") | ||
context.parent ! f | ||
|
||
self ! ClientClosed | ||
|
||
goto(ClientProxyRemoving) | ||
|
@@ -164,9 +166,12 @@ class ActivationClientProxy( | |
stay() | ||
|
||
case _: ActionMismatch => | ||
logging.error(this, s"[${containerId.asString}] action version does not match: $action") | ||
val errorMsg = s"[${containerId.asString}] action version does not match: $action" | ||
logging.error(this, errorMsg) | ||
c.activationClient.close().andThen { | ||
case _ => self ! ClientClosed | ||
case _ => | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I was also seeing the issue of orphaned data for a test action that I frequently update every minute outside of the scheduler deployments. Does it stand to reason this was really the same issue and this will also fix that issue? The issue started happening I want to say around the same time of the October 13th commit that introduced the regression so it seems likely to me. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I think so. |
||
context.parent ! FailureMessage(new RuntimeException(errorMsg)) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. As So now, we need to send a |
||
self ! ClientClosed | ||
} | ||
|
||
goto(ClientProxyRemoving) | ||
|
@@ -194,6 +199,7 @@ class ActivationClientProxy( | |
// it would print huge log due to create another grpcClient to fetch activation again. | ||
case t: StatusRuntimeException if t.getMessage.contains(ActivationClientProxy.hostResolveError) => | ||
logging.error(this, s"[${containerId.asString}] akka grpc server connection failed: $t") | ||
context.parent ! FailureMessage(t) | ||
self ! ClientClosed | ||
|
||
goto(ClientProxyRemoving) | ||
|
@@ -208,14 +214,18 @@ class ActivationClientProxy( | |
|
||
stay() | ||
|
||
case _: ClientClosedException => | ||
case t: ClientClosedException => | ||
logging.error(this, s"[${containerId.asString}] grpc client is already closed for $action") | ||
context.parent ! FailureMessage(t) | ||
|
||
self ! ClientClosed | ||
|
||
goto(ClientProxyRemoving) | ||
|
||
case t: Throwable => | ||
logging.error(this, s"[${containerId.asString}] get activation from remote server error: $t") | ||
context.parent ! FailureMessage(t) | ||
|
||
safelyCloseClient(c) | ||
goto(ClientProxyRemoving) | ||
} | ||
|
@@ -372,7 +382,7 @@ class ActivationClientProxy( | |
logging.debug(this, s"grpc client is closed for $fqn in the Try closure") | ||
Future.successful(ClientClosed) | ||
} | ||
.getOrElse(Future.failed(new Exception(s"error to get $fqn activation from grpc server"))) | ||
.getOrElse(Future.failed(new RuntimeException(s"error to get $fqn activation from grpc server"))) | ||
} | ||
|
||
private def createActivationClient(invocationNamespace: String, | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -272,8 +272,7 @@ class FunctionPullingContainerProxy( | |
job.rpcPort, | ||
container.containerId)) match { | ||
case Success(clientProxy) => | ||
clientProxy ! StartClient | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Previously, this chaining of the future causes the timing issue. case Event(ClientCreationCompleted(proxy), _: NonexistentData) =>
akka.pattern.after(3.milliseconds, actorSystem.scheduler) {
self ! ClientCreationCompleted(proxy.orElse(Some(sender())))
Future.successful({})
} It had made the logic indeterministic and less efficient, so I refactored it. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. great |
||
ContainerCreatedData(container, job.invocationNamespace, job.action) | ||
InitializedData(container, job.invocationNamespace, job.action, clientProxy) | ||
case Failure(t) => | ||
logging.error(this, s"failed to create activation client caused by: $t") | ||
ClientCreationFailed(t, container, job.invocationNamespace, job.action) | ||
|
@@ -303,7 +302,7 @@ class FunctionPullingContainerProxy( | |
// prewarmed state, container created | ||
when(ContainerCreated) { | ||
case Event(job: Initialize, data: PreWarmData) => | ||
Try( | ||
val res = Try( | ||
clientProxyFactory( | ||
context, | ||
job.invocationNamespace, | ||
|
@@ -313,13 +312,15 @@ class FunctionPullingContainerProxy( | |
job.rpcPort, | ||
data.container.containerId)) match { | ||
case Success(proxy) => | ||
proxy ! StartClient | ||
InitializedData(data.container, job.invocationNamespace, job.action, proxy) | ||
case Failure(t) => | ||
logging.error(this, s"failed to create activation client for ${job.action} caused by: $t") | ||
self ! ClientCreationFailed(t, data.container, job.invocationNamespace, job.action) | ||
ClientCreationFailed(t, data.container, job.invocationNamespace, job.action) | ||
} | ||
|
||
goto(CreatingClient) using ContainerCreatedData(data.container, job.invocationNamespace, job.action) | ||
self ! res | ||
|
||
goto(CreatingClient) | ||
|
||
case Event(Remove, data: PreWarmData) => | ||
cleanUp(data.container, None, false) | ||
|
@@ -334,41 +335,27 @@ class FunctionPullingContainerProxy( | |
|
||
when(CreatingClient) { | ||
// wait for client creation when cold start | ||
case Event(job: ContainerCreatedData, _: NonexistentData) => | ||
stay() using job | ||
case Event(job: InitializedData, _) => | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Now, it sends the |
||
job.clientProxy ! StartClient | ||
|
||
// wait for container creation when cold start | ||
case Event(ClientCreationCompleted(proxy), _: NonexistentData) => | ||
akka.pattern.after(3.milliseconds, actorSystem.scheduler) { | ||
self ! ClientCreationCompleted(proxy.orElse(Some(sender()))) | ||
Future.successful({}) | ||
} | ||
|
||
stay() | ||
stay() using job | ||
|
||
// client was successfully obtained | ||
case Event(ClientCreationCompleted(proxy), data: ContainerCreatedData) => | ||
val clientProxy = proxy.getOrElse(sender()) | ||
case Event(ClientCreationCompleted, data: InitializedData) => | ||
val fqn = data.action.fullyQualifiedName(true) | ||
val revision = data.action.rev | ||
dataManagementService ! RegisterData( | ||
s"${ContainerKeys.existingContainers(data.invocationNamespace, fqn, revision, Some(instance), Some(data.container.containerId))}", | ||
"") | ||
self ! InitializedData(data.container, data.invocationNamespace, data.action, clientProxy) | ||
self ! data | ||
goto(ClientCreated) | ||
|
||
// client creation failed | ||
case Event(t: ClientCreationFailed, _) => | ||
invokerHealthManager ! HealthMessage(state = false) | ||
cleanUp(t.container, t.invocationNamespace, t.action.fullyQualifiedName(withVersion = true), t.action.rev, None) | ||
|
||
// there can be a case that client create is failed and a ClientClosed will be sent by ActivationClientProxy | ||
// wait for container creation when cold start | ||
case Event(ClientClosed, _: NonexistentData) => | ||
self ! ClientClosed | ||
stay() | ||
|
||
case Event(ClientClosed, data: ContainerCreatedData) => | ||
case Event(ClientClosed, data: InitializedData) => | ||
invokerHealthManager ! HealthMessage(state = false) | ||
cleanUp( | ||
data.container, | ||
|
@@ -378,7 +365,7 @@ class FunctionPullingContainerProxy( | |
None) | ||
|
||
// container creation failed when cold start | ||
case Event(t: FailureMessage, _) => | ||
case Event(_: FailureMessage, _) => | ||
context.parent ! ContainerRemoved(true) | ||
stop() | ||
|
||
|
@@ -518,6 +505,8 @@ class FunctionPullingContainerProxy( | |
data.action.fullyQualifiedName(withVersion = true), | ||
data.action.rev, | ||
Some(data.clientProxy)) | ||
|
||
case x: Event if x.event != PingCache => delay | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This will make sure There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. My concern with having a stash message for all events and then unstash it on state transition, is that it was the root cause of the edge case with the orphaned etcd data for pausing / unpausing containers where a generic I just want to make sure that we're 100% sure a catch all here will not have any unknown side effects. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This state is a temporary state before removing containers and the proxy itself. In the when(Removing, unusedTimeout) {
// only if ClientProxy is closed, ContainerProxy stops. So it is important for ClientProxy to send ClientClosed.
case Event(ClientClosed, _) =>
stop()
// even if any error occurs, it still waits for ClientClosed event in order to be stopped after the client is closed.
case Event(t: FailureMessage, _) =>
logging.error(this, s"unable to delete a container due to ${t}")
stay
case Event(StateTimeout, _) =>
logging.error(this, s"could not receive ClientClosed for ${unusedTimeout}, so just stop the container proxy.")
stop()
case Event(Remove | GracefulShutdown, _) =>
stay()
case Event(DetermineKeepContainer(_), _) =>
stay()
} So I suppose it would be OK to delay. |
||
} | ||
|
||
when(Running) { | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -96,7 +96,9 @@ class SchedulingDecisionMaker( | |
this, | ||
s"there is no capacity activations will be dropped or throttled, (availableMsg: $availableMsg totalContainers: $totalContainers, limit: $limit, namespaceContainers: ${existingContainerCountInNs}, namespaceInProgressContainer: ${inProgressContainerCountInNs}) [$invocationNamespace:$action]") | ||
Future.successful(DecisionResults(EnableNamespaceThrottling(dropMsg = totalContainers == 0), 0)) | ||
case NamespaceThrottled if schedulingConfig.allowOverProvisionBeforeThrottle && ceiling(limit * schedulingConfig.namespaceOverProvisionBeforeThrottleRatio) - existingContainerCountInNs - inProgressContainerCountInNs > 0 => | ||
case NamespaceThrottled | ||
if schedulingConfig.allowOverProvisionBeforeThrottle && ceiling( | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. It seems our CI tests do not run |
||
limit * schedulingConfig.namespaceOverProvisionBeforeThrottleRatio) - existingContainerCountInNs - inProgressContainerCountInNs > 0 => | ||
Future.successful(DecisionResults(DisableNamespaceThrottling, 0)) | ||
// do nothing | ||
case _ => | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
As we store the reference of a client proxy after creation, we don't need to pass/receive the client-proxy reference via this message. So it became an object.
https://github.com/apache/openwhisk/pull/5348/files#diff-23fc1c1634cd8a2e99b4cfbf342527248a53f5987911af80ab5d910ce7864d70R315