Skip to content

Commit 44791f3

Browse files
authored
Handle container cleanup from ActivationClient shutdown gracefully (#5348)
* Fix the regression * Apply scalaFmt * Fix test cases * Make the MemoryQueueTests stable * Make the ActivationClientProxyTests stable
1 parent 077fb6d commit 44791f3

File tree

7 files changed

+91
-72
lines changed

7 files changed

+91
-72
lines changed

core/invoker/src/main/scala/org/apache/openwhisk/core/containerpool/v2/ActivationClientProxy.scala

Lines changed: 17 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,7 @@
1818
package org.apache.openwhisk.core.containerpool.v2
1919

2020
import akka.actor.Status.{Failure => FailureMessage}
21-
import akka.actor.{ActorRef, ActorSystem, FSM, Props, Stash}
21+
import akka.actor.{ActorSystem, FSM, Props, Stash}
2222
import akka.grpc.internal.ClientClosedException
2323
import akka.pattern.pipe
2424
import io.grpc.StatusRuntimeException
@@ -36,7 +36,7 @@ import scala.concurrent.Future
3636
import scala.util.{Success, Try}
3737

3838
// Event send by the actor
39-
case class ClientCreationCompleted(client: Option[ActorRef] = None)
39+
case object ClientCreationCompleted
4040
case object ClientClosed
4141

4242
// Event received by the actor
@@ -91,12 +91,14 @@ class ActivationClientProxy(
9191
stay using r
9292

9393
case Event(client: ActivationClient, _) =>
94-
context.parent ! ClientCreationCompleted()
94+
context.parent ! ClientCreationCompleted
9595

9696
goto(ClientProxyReady) using Client(client.client, client.rpcHost, client.rpcPort)
9797

9898
case Event(f: FailureMessage, _) =>
9999
logging.error(this, s"failed to create grpc client for ${action} caused by: $f")
100+
context.parent ! f
101+
100102
self ! ClientClosed
101103

102104
goto(ClientProxyRemoving)
@@ -164,9 +166,12 @@ class ActivationClientProxy(
164166
stay()
165167

166168
case _: ActionMismatch =>
167-
logging.error(this, s"[${containerId.asString}] action version does not match: $action")
169+
val errorMsg = s"[${containerId.asString}] action version does not match: $action"
170+
logging.error(this, errorMsg)
168171
c.activationClient.close().andThen {
169-
case _ => self ! ClientClosed
172+
case _ =>
173+
context.parent ! FailureMessage(new RuntimeException(errorMsg))
174+
self ! ClientClosed
170175
}
171176

172177
goto(ClientProxyRemoving)
@@ -194,6 +199,7 @@ class ActivationClientProxy(
194199
// it would print huge log due to create another grpcClient to fetch activation again.
195200
case t: StatusRuntimeException if t.getMessage.contains(ActivationClientProxy.hostResolveError) =>
196201
logging.error(this, s"[${containerId.asString}] akka grpc server connection failed: $t")
202+
context.parent ! FailureMessage(t)
197203
self ! ClientClosed
198204

199205
goto(ClientProxyRemoving)
@@ -208,14 +214,18 @@ class ActivationClientProxy(
208214

209215
stay()
210216

211-
case _: ClientClosedException =>
217+
case t: ClientClosedException =>
212218
logging.error(this, s"[${containerId.asString}] grpc client is already closed for $action")
219+
context.parent ! FailureMessage(t)
220+
213221
self ! ClientClosed
214222

215223
goto(ClientProxyRemoving)
216224

217225
case t: Throwable =>
218226
logging.error(this, s"[${containerId.asString}] get activation from remote server error: $t")
227+
context.parent ! FailureMessage(t)
228+
219229
safelyCloseClient(c)
220230
goto(ClientProxyRemoving)
221231
}
@@ -372,7 +382,7 @@ class ActivationClientProxy(
372382
logging.debug(this, s"grpc client is closed for $fqn in the Try closure")
373383
Future.successful(ClientClosed)
374384
}
375-
.getOrElse(Future.failed(new Exception(s"error to get $fqn activation from grpc server")))
385+
.getOrElse(Future.failed(new RuntimeException(s"error to get $fqn activation from grpc server")))
376386
}
377387

378388
private def createActivationClient(invocationNamespace: String,

core/invoker/src/main/scala/org/apache/openwhisk/core/containerpool/v2/FunctionPullingContainerProxy.scala

Lines changed: 16 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -272,8 +272,7 @@ class FunctionPullingContainerProxy(
272272
job.rpcPort,
273273
container.containerId)) match {
274274
case Success(clientProxy) =>
275-
clientProxy ! StartClient
276-
ContainerCreatedData(container, job.invocationNamespace, job.action)
275+
InitializedData(container, job.invocationNamespace, job.action, clientProxy)
277276
case Failure(t) =>
278277
logging.error(this, s"failed to create activation client caused by: $t")
279278
ClientCreationFailed(t, container, job.invocationNamespace, job.action)
@@ -303,7 +302,7 @@ class FunctionPullingContainerProxy(
303302
// prewarmed state, container created
304303
when(ContainerCreated) {
305304
case Event(job: Initialize, data: PreWarmData) =>
306-
Try(
305+
val res = Try(
307306
clientProxyFactory(
308307
context,
309308
job.invocationNamespace,
@@ -313,13 +312,15 @@ class FunctionPullingContainerProxy(
313312
job.rpcPort,
314313
data.container.containerId)) match {
315314
case Success(proxy) =>
316-
proxy ! StartClient
315+
InitializedData(data.container, job.invocationNamespace, job.action, proxy)
317316
case Failure(t) =>
318317
logging.error(this, s"failed to create activation client for ${job.action} caused by: $t")
319-
self ! ClientCreationFailed(t, data.container, job.invocationNamespace, job.action)
318+
ClientCreationFailed(t, data.container, job.invocationNamespace, job.action)
320319
}
321320

322-
goto(CreatingClient) using ContainerCreatedData(data.container, job.invocationNamespace, job.action)
321+
self ! res
322+
323+
goto(CreatingClient)
323324

324325
case Event(Remove, data: PreWarmData) =>
325326
cleanUp(data.container, None, false)
@@ -334,41 +335,27 @@ class FunctionPullingContainerProxy(
334335

335336
when(CreatingClient) {
336337
// wait for client creation when cold start
337-
case Event(job: ContainerCreatedData, _: NonexistentData) =>
338-
stay() using job
338+
case Event(job: InitializedData, _) =>
339+
job.clientProxy ! StartClient
339340

340-
// wait for container creation when cold start
341-
case Event(ClientCreationCompleted(proxy), _: NonexistentData) =>
342-
akka.pattern.after(3.milliseconds, actorSystem.scheduler) {
343-
self ! ClientCreationCompleted(proxy.orElse(Some(sender())))
344-
Future.successful({})
345-
}
346-
347-
stay()
341+
stay() using job
348342

349343
// client was successfully obtained
350-
case Event(ClientCreationCompleted(proxy), data: ContainerCreatedData) =>
351-
val clientProxy = proxy.getOrElse(sender())
344+
case Event(ClientCreationCompleted, data: InitializedData) =>
352345
val fqn = data.action.fullyQualifiedName(true)
353346
val revision = data.action.rev
354347
dataManagementService ! RegisterData(
355348
s"${ContainerKeys.existingContainers(data.invocationNamespace, fqn, revision, Some(instance), Some(data.container.containerId))}",
356349
"")
357-
self ! InitializedData(data.container, data.invocationNamespace, data.action, clientProxy)
350+
self ! data
358351
goto(ClientCreated)
359352

360353
// client creation failed
361354
case Event(t: ClientCreationFailed, _) =>
362355
invokerHealthManager ! HealthMessage(state = false)
363356
cleanUp(t.container, t.invocationNamespace, t.action.fullyQualifiedName(withVersion = true), t.action.rev, None)
364357

365-
// there can be a case that client create is failed and a ClientClosed will be sent by ActivationClientProxy
366-
// wait for container creation when cold start
367-
case Event(ClientClosed, _: NonexistentData) =>
368-
self ! ClientClosed
369-
stay()
370-
371-
case Event(ClientClosed, data: ContainerCreatedData) =>
358+
case Event(ClientClosed, data: InitializedData) =>
372359
invokerHealthManager ! HealthMessage(state = false)
373360
cleanUp(
374361
data.container,
@@ -378,7 +365,7 @@ class FunctionPullingContainerProxy(
378365
None)
379366

380367
// container creation failed when cold start
381-
case Event(t: FailureMessage, _) =>
368+
case Event(_: FailureMessage, _) =>
382369
context.parent ! ContainerRemoved(true)
383370
stop()
384371

@@ -518,6 +505,8 @@ class FunctionPullingContainerProxy(
518505
data.action.fullyQualifiedName(withVersion = true),
519506
data.action.rev,
520507
Some(data.clientProxy))
508+
509+
case x: Event if x.event != PingCache => delay
521510
}
522511

523512
when(Running) {

core/invoker/src/main/scala/org/apache/openwhisk/core/containerpool/v2/InvokerHealthManager.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -247,7 +247,7 @@ case class HealthActivationServiceClient() extends Actor {
247247
private var closed: Boolean = false
248248

249249
override def receive: Receive = {
250-
case StartClient => sender() ! ClientCreationCompleted()
250+
case StartClient => sender() ! ClientCreationCompleted
251251
case _: RequestActivation =>
252252
InvokerHealthManager.healthActivation match {
253253
case Some(activation) if !closed =>

core/scheduler/src/main/scala/org/apache/openwhisk/core/scheduler/queue/SchedulingDecisionMaker.scala

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -96,7 +96,9 @@ class SchedulingDecisionMaker(
9696
this,
9797
s"there is no capacity activations will be dropped or throttled, (availableMsg: $availableMsg totalContainers: $totalContainers, limit: $limit, namespaceContainers: ${existingContainerCountInNs}, namespaceInProgressContainer: ${inProgressContainerCountInNs}) [$invocationNamespace:$action]")
9898
Future.successful(DecisionResults(EnableNamespaceThrottling(dropMsg = totalContainers == 0), 0))
99-
case NamespaceThrottled if schedulingConfig.allowOverProvisionBeforeThrottle && ceiling(limit * schedulingConfig.namespaceOverProvisionBeforeThrottleRatio) - existingContainerCountInNs - inProgressContainerCountInNs > 0 =>
99+
case NamespaceThrottled
100+
if schedulingConfig.allowOverProvisionBeforeThrottle && ceiling(
101+
limit * schedulingConfig.namespaceOverProvisionBeforeThrottleRatio) - existingContainerCountInNs - inProgressContainerCountInNs > 0 =>
100102
Future.successful(DecisionResults(DisableNamespaceThrottling, 0))
101103
// do nothing
102104
case _ =>

tests/src/test/scala/org/apache/openwhisk/core/containerpool/v2/test/ActivationClientProxyTests.scala

Lines changed: 21 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@ package org.apache.openwhisk.core.containerpool.v2.test
1919

2020
import akka.Done
2121
import akka.actor.FSM.{CurrentState, SubscribeTransitionCallBack, Transition}
22+
import akka.actor.Status.Failure
2223
import akka.actor.{ActorRef, ActorSystem}
2324
import akka.grpc.internal.ClientClosedException
2425
import akka.testkit.{ImplicitSender, TestKit, TestProbe}
@@ -103,7 +104,7 @@ class ActivationClientProxyTests
103104

104105
machine ! StartClient
105106

106-
probe.expectMsg(ClientCreationCompleted())
107+
probe.expectMsg(ClientCreationCompleted)
107108
probe.expectMsg(Transition(machine, ClientProxyUninitialized, ClientProxyReady))
108109
}
109110

@@ -124,6 +125,9 @@ class ActivationClientProxyTests
124125

125126
machine ! StartClient
126127

128+
probe.expectMsgPF() {
129+
case Failure(t) => t.getMessage shouldBe "The number of client creation retries has been exceeded."
130+
}
127131
probe.expectMsg(Transition(machine, ClientProxyUninitialized, ClientProxyRemoving))
128132
probe.expectMsg(ClientClosed)
129133

@@ -208,7 +212,14 @@ class ActivationClientProxyTests
208212
ready(machine, probe)
209213

210214
machine ! RequestActivation()
211-
probe.expectMsg(Transition(machine, ClientProxyReady, ClientProxyRemoving))
215+
216+
inAnyOrder {
217+
probe.expectMsg(Transition(machine, ClientProxyReady, ClientProxyRemoving))
218+
probe.expectMsgPF() {
219+
case Failure(t) => t.getMessage.contains(s"action version does not match") shouldBe true
220+
}
221+
}
222+
212223
probe.expectMsg(ClientClosed)
213224

214225
probe expectTerminated machine
@@ -319,7 +330,11 @@ class ActivationClientProxyTests
319330
ready(machine, probe)
320331

321332
machine ! RequestActivation()
333+
probe.expectMsgPF() {
334+
case Failure(t) => t.isInstanceOf[ClientClosedException] shouldBe true
335+
}
322336
probe.expectMsg(Transition(machine, ClientProxyReady, ClientProxyRemoving))
337+
323338
probe.expectMsg(ClientClosed)
324339

325340
probe expectTerminated machine
@@ -343,6 +358,9 @@ class ActivationClientProxyTests
343358
ready(machine, probe)
344359

345360
machine ! RequestActivation()
361+
probe.expectMsgPF() {
362+
case Failure(t) => t.getMessage.contains("Unknown exception") shouldBe true
363+
}
346364
probe.expectMsg(Transition(machine, ClientProxyReady, ClientProxyRemoving))
347365
probe.expectMsg(ClientClosed)
348366

@@ -426,7 +444,7 @@ class ActivationClientProxyTests
426444

427445
def ready(machine: ActorRef, probe: TestProbe) = {
428446
machine ! StartClient
429-
probe.expectMsg(ClientCreationCompleted())
447+
probe.expectMsg(ClientCreationCompleted)
430448
probe.expectMsg(Transition(machine, ClientProxyUninitialized, ClientProxyReady))
431449
}
432450

0 commit comments

Comments
 (0)