Skip to content

Fix Orphaned Container Edge Case In Paused State of Container Proxy #5326

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -158,7 +158,7 @@ class ShardingContainerPoolBalancer(
AkkaManagement(actorSystem).start()
ClusterBootstrap(actorSystem).start()
Some(Cluster(actorSystem))
} else if (loadConfigOrThrow[Seq[String]]("akka.cluster.seed-nodes").nonEmpty) {
} else if (loadConfigOrThrow[Seq[String]]("akka.cluster.seed-nodes").nonEmpty) {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this is just a missed scalafmt

Some(Cluster(actorSystem))
} else {
None
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,7 @@ case class Initialized(data: InitializedData)
case class Resumed(data: WarmData)
case class ResumeFailed(data: WarmData)
case class RecreateClient(action: ExecutableWhiskAction)
case class DetermineKeepContainer(attempt: Int)

// States
sealed trait ProxyState
Expand Down Expand Up @@ -661,6 +662,7 @@ class FunctionPullingContainerProxy(
val parent = context.parent
cancelTimer(IdleTimeoutName)
cancelTimer(KeepingTimeoutName)
cancelTimer(DetermineKeepContainer.toString)
data.container
.resume()
.map { _ =>
Expand Down Expand Up @@ -693,32 +695,43 @@ class FunctionPullingContainerProxy(
instance,
data.container.containerId))
goto(Running)

case Event(StateTimeout, data: WarmData) =>
(for {
count <- getLiveContainerCount(data.invocationNamespace, data.action.fullyQualifiedName(false), data.revision)
(warmedContainerKeepingCount, warmedContainerKeepingTimeout) <- getWarmedContainerLimit(
data.invocationNamespace)
} yield {
logging.info(
this,
s"Live container count: ${count}, warmed container keeping count configuration: ${warmedContainerKeepingCount} in namespace: ${data.invocationNamespace}")
if (count <= warmedContainerKeepingCount) {
Keep(warmedContainerKeepingTimeout)
} else {
Remove
}
}).pipeTo(self)
case Event(StateTimeout, _: WarmData) =>
self ! DetermineKeepContainer(0)
stay
case Event(DetermineKeepContainer(attempt), data: WarmData) =>
getLiveContainerCount(data.invocationNamespace, data.action.fullyQualifiedName(false), data.revision)
.flatMap(count => {
getWarmedContainerLimit(data.invocationNamespace).map(warmedContainerInfo => {
logging.info(
this,
s"Live container count: $count, warmed container keeping count configuration: ${warmedContainerInfo._1} in namespace: ${data.invocationNamespace}")
if (count <= warmedContainerInfo._1) {
self ! Keep(warmedContainerInfo._2)
} else {
self ! Remove
}
})
})
.recover({
case t: Throwable =>
logging.error(
this,
s"Failed to determine whether to keep or remove container on pause timeout for ${data.container.containerId}, retrying. Caused by: $t")
if (attempt < 5) {
startSingleTimer(DetermineKeepContainer.toString, DetermineKeepContainer(attempt + 1), 500.milli)
} else {
self ! Remove
}
})
stay

case Event(Keep(warmedContainerKeepingTimeout), data: WarmData) =>
logging.info(
this,
s"This is the remaining container for ${data.action}. The container will stop after $warmedContainerKeepingTimeout.")
startSingleTimer(KeepingTimeoutName, Remove, warmedContainerKeepingTimeout)
stay

case Event(Remove | GracefulShutdown, data: WarmData) =>
cancelTimer(DetermineKeepContainer.toString)
dataManagementService ! UnregisterData(
ContainerKeys.warmedContainers(
data.invocationNamespace,
Expand All @@ -732,7 +745,6 @@ class FunctionPullingContainerProxy(
data.action.fullyQualifiedName(false),
data.action.rev,
Some(data.clientProxy))

case _ => delay
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -291,6 +291,22 @@ class FunctionPullingContainerProxyTests
Future.successful(count)
}

def getLiveContainerCountFail(count: Long) = LoggedFunction { (_: String, _: FullyQualifiedEntityName, _: DocRevision) =>
Future.failed(new Exception("failure"))
}

def getLiveContainerCountFailFirstCall(count: Long) = {
var firstCall = true
LoggedFunction { (_: String, _: FullyQualifiedEntityName, _: DocRevision) =>
if (firstCall) {
firstCall = false
Future.failed(new Exception("failure"))
} else {
Future.successful(count)
}
}
}

def getWarmedContainerLimit(limit: Future[(Int, FiniteDuration)]) = LoggedFunction { (_: String) =>
limit
}
Expand Down Expand Up @@ -1036,6 +1052,176 @@ class FunctionPullingContainerProxyTests
}
}

it should "destroy container proxy when stopping due to timeout and getting live count fails once" in within(timeout) {
val authStore = mock[ArtifactWhiskAuthStore]
val namespaceBlacklist: NamespaceBlacklist = new NamespaceBlacklist(authStore)
val get = getWhiskAction(Future(action.toWhiskAction))
val dataManagementService = TestProbe()
val container = new TestContainer
val factory = createFactory(Future.successful(container))
val acker = createAcker()
val store = createStore
val collector = createCollector()
val counter = getLiveContainerCountFailFirstCall(2)
val limit = getWarmedContainerLimit(Future.successful((1, 10.seconds)))
val (client, clientFactory) = testClient

val probe = TestProbe()
val machine =
probe.childActorOf(
FunctionPullingContainerProxy
.props(
factory,
entityStore,
namespaceBlacklist,
get,
dataManagementService.ref,
clientFactory,
acker,
store,
collector,
counter,
limit,
InvokerInstanceId(0, userMemory = defaultUserMemory),
invokerHealthManager.ref,
poolConfig,
timeoutConfig))

registerCallback(machine, probe)
probe watch machine

machine ! Initialize(invocationNamespace.asString, fqn, action, schedulerHost, rpcPort, messageTransId)

probe.expectMsg(Transition(machine, Uninitialized, CreatingClient))
client.expectMsg(StartClient)
client.send(machine, ClientCreationCompleted())

probe.expectMsg(Transition(machine, CreatingClient, ClientCreated))
expectInitialized(probe)
//register running container
dataManagementService.expectMsgType[RegisterData]

client.expectMsg(RequestActivation())
client.send(machine, message)

probe.expectMsg(Transition(machine, ClientCreated, Running))
client.expectMsg(ContainerWarmed)
client.expectMsgPF() {
case RequestActivation(Some(_), None) => true
}

machine ! StateTimeout
client.send(machine, RetryRequestActivation)
probe.expectMsg(Transition(machine, Running, Pausing))
probe.expectMsgType[ContainerIsPaused]
probe.expectMsg(Transition(machine, Pausing, Paused))
//register paused warmed container
dataManagementService.expectMsgType[RegisterData]

machine ! StateTimeout
client.expectMsg(StopClientProxy)
dataManagementService.expectMsgType[UnregisterData]
probe.expectMsgAllOf(ContainerRemoved(true), Transition(machine, Paused, Removing))
client.send(machine, ClientClosed)

probe expectTerminated machine

awaitAssert {
factory.calls should have size 1
container.initializeCount shouldBe 1
container.runCount shouldBe 1
collector.calls.length shouldBe 1
container.destroyCount shouldBe 1
acker.calls.length shouldBe 1
store.calls.length shouldBe 1
}
}

it should "destroy container proxy when stopping due to timeout and getting live count fails permanently" in within(timeout) {
val authStore = mock[ArtifactWhiskAuthStore]
val namespaceBlacklist: NamespaceBlacklist = new NamespaceBlacklist(authStore)
val get = getWhiskAction(Future(action.toWhiskAction))
val dataManagementService = TestProbe()
val container = new TestContainer
val factory = createFactory(Future.successful(container))
val acker = createAcker()
val store = createStore
val collector = createCollector()
val counter = getLiveContainerCountFail(2)
val limit = getWarmedContainerLimit(Future.successful((1, 10.seconds)))
val (client, clientFactory) = testClient

val probe = TestProbe()
val machine =
probe.childActorOf(
FunctionPullingContainerProxy
.props(
factory,
entityStore,
namespaceBlacklist,
get,
dataManagementService.ref,
clientFactory,
acker,
store,
collector,
counter,
limit,
InvokerInstanceId(0, userMemory = defaultUserMemory),
invokerHealthManager.ref,
poolConfig,
timeoutConfig))

registerCallback(machine, probe)
probe watch machine

machine ! Initialize(invocationNamespace.asString, fqn, action, schedulerHost, rpcPort, messageTransId)

probe.expectMsg(Transition(machine, Uninitialized, CreatingClient))
client.expectMsg(StartClient)
client.send(machine, ClientCreationCompleted())

probe.expectMsg(Transition(machine, CreatingClient, ClientCreated))
expectInitialized(probe)
//register running container
dataManagementService.expectMsgType[RegisterData]

client.expectMsg(RequestActivation())
client.send(machine, message)

probe.expectMsg(Transition(machine, ClientCreated, Running))
client.expectMsg(ContainerWarmed)
client.expectMsgPF() {
case RequestActivation(Some(_), None) => true
}

machine ! StateTimeout
client.send(machine, RetryRequestActivation)
probe.expectMsg(Transition(machine, Running, Pausing))
probe.expectMsgType[ContainerIsPaused]
probe.expectMsg(Transition(machine, Pausing, Paused))
//register paused warmed container
dataManagementService.expectMsgType[RegisterData]

machine ! StateTimeout
client.expectMsg(StopClientProxy)
dataManagementService.expectMsgType[UnregisterData]
probe.expectMsgAllOf(ContainerRemoved(true), Transition(machine, Paused, Removing))
client.send(machine, ClientClosed)

probe expectTerminated machine

awaitAssert {
factory.calls should have size 1
container.initializeCount shouldBe 1
container.runCount shouldBe 1
collector.calls.length shouldBe 1
container.destroyCount shouldBe 1
acker.calls.length shouldBe 1
store.calls.length shouldBe 1
}
}

it should "destroy container proxy even if there is no message from the client when stopping due to timeout" in within(
timeout) {
val authStore = mock[ArtifactWhiskAuthStore]
Expand Down