Skip to content

Commit a3956e3

Browse files
style95michele-sciabarra
authored andcommitted
Delete ETCD data first when disabling the invoker (apache#5333)
* Delete ETCD data first when disabling the invoker * Add the cluster name to controllers and invokers * Handle unhandled message in the Removing state
1 parent 508d699 commit a3956e3

File tree

4 files changed

+120
-19
lines changed

4 files changed

+120
-19
lines changed

ansible/roles/controller/tasks/deploy.yml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -154,6 +154,7 @@
154154

155155
"CONFIG_whisk_info_date": "{{ whisk.version.date }}"
156156
"CONFIG_whisk_info_buildNo": "{{ docker.image.tag }}"
157+
"CONFIG_whisk_cluster_name": "{{ whisk.cluster_name | lower }}"
157158

158159
"KAFKA_HOSTS": "{{ kafka_connect_string }}"
159160
"CONFIG_whisk_kafka_replicationFactor":

ansible/roles/invoker/tasks/deploy.yml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -296,6 +296,7 @@
296296
"CONFIG_whisk_containerPool_batchDeletionSize": "{{ container_pool_batchDeletionSize | default(10) }}"
297297
"CONFIG_whisk_invoker_username": "{{ invoker.username }}"
298298
"CONFIG_whisk_invoker_password": "{{ invoker.password }}"
299+
"CONFIG_whisk_cluster_name": "{{ whisk.cluster_name | lower }}"
299300

300301
- name: extend invoker dns env
301302
set_fact:

core/invoker/src/main/scala/org/apache/openwhisk/core/containerpool/v2/FunctionPullingContainerProxy.scala

Lines changed: 15 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -606,12 +606,7 @@ class FunctionPullingContainerProxy(
606606
if (runningActivations.isEmpty) {
607607
logging.info(this, s"The Client closed in state: $stateName, action: ${data.action}")
608608
// Stop ContainerProxy(ActivationClientProxy will stop also when send ClientClosed to ContainerProxy).
609-
cleanUp(
610-
data.container,
611-
data.invocationNamespace,
612-
data.action.fullyQualifiedName(withVersion = true),
613-
data.action.rev,
614-
None)
609+
cleanUp(data.container, None, false)
615610
} else {
616611
logging.info(
617612
this,
@@ -624,6 +619,15 @@ class FunctionPullingContainerProxy(
624619
// ContainerProxy will be terminated by StateTimeout if there is no further activation
625620
case Event(GracefulShutdown, data: WarmData) =>
626621
logging.info(this, s"receive GracefulShutdown for action: ${data.action}")
622+
// clean up the etcd data first so that the scheduler can provision more containers in advance.
623+
dataManagementService ! UnregisterData(
624+
ContainerKeys.existingContainers(
625+
data.invocationNamespace,
626+
data.action.fullyQualifiedName(true),
627+
data.action.rev,
628+
Some(instance),
629+
Some(data.container.containerId)))
630+
627631
// Just send CloseClientProxy to ActivationClientProxy, make ActivationClientProxy throw ClientClosedException when fetchActivation next time.
628632
data.clientProxy ! CloseClientProxy
629633
stay
@@ -765,10 +769,14 @@ class FunctionPullingContainerProxy(
765769
case Event(StateTimeout, _) =>
766770
logging.error(this, s"could not receive ClientClosed for ${unusedTimeout}, so just stop the container proxy.")
767771

768-
stop
772+
stop()
769773

770774
case Event(Remove | GracefulShutdown, _) =>
771775
stay()
776+
777+
778+
case Event(DetermineKeepContainer(_), _) =>
779+
stay()
772780
}
773781

774782
whenUnhandled {

tests/src/test/scala/org/apache/openwhisk/core/containerpool/v2/test/FunctionPullingContainerProxyTests.scala

Lines changed: 103 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -17,22 +17,17 @@
1717

1818
package org.apache.openwhisk.core.containerpool.v2.test
1919

20-
import java.net.InetSocketAddress
21-
import java.time.Instant
22-
import java.util.concurrent.TimeUnit
23-
import java.util.concurrent.atomic.AtomicInteger
24-
2520
import akka.actor.FSM.{CurrentState, StateTimeout, SubscribeTransitionCallBack, Transition}
2621
import akka.actor.{Actor, ActorRef, ActorRefFactory, ActorSystem, Props}
2722
import akka.http.scaladsl.model
2823
import akka.io.Tcp.Connect
2924
import akka.stream.scaladsl.{Sink, Source}
30-
import akka.testkit.{ImplicitSender, TestKit, TestProbe}
25+
import akka.testkit.{ImplicitSender, TestFSMRef, TestKit, TestProbe}
3126
import akka.util.ByteString
3227
import com.ibm.etcd.api.{DeleteRangeResponse, KeyValue, PutResponse}
3328
import com.ibm.etcd.client.{EtcdClient => Client}
3429
import common.{LoggedFunction, StreamLogging, SynchronizedLoggedFunction}
35-
import org.apache.openwhisk.common.{Logging, TransactionId}
30+
import org.apache.openwhisk.common.{GracefulShutdown, Logging, TransactionId}
3631
import org.apache.openwhisk.core.ack.ActiveAck
3732
import org.apache.openwhisk.core.connector.{AcknowledegmentMessage, ActivationMessage}
3833
import org.apache.openwhisk.core.containerpool.logging.LogCollectingException
@@ -51,7 +46,7 @@ import org.apache.openwhisk.core.database.{ArtifactStore, StaleParameter, UserCo
5146
import org.apache.openwhisk.core.entity.ExecManifest.{ImageName, RuntimeManifest}
5247
import org.apache.openwhisk.core.entity.size._
5348
import org.apache.openwhisk.core.entity.types.AuthStore
54-
import org.apache.openwhisk.core.entity.{ExecutableWhiskAction, _}
49+
import org.apache.openwhisk.core.entity._
5550
import org.apache.openwhisk.core.etcd.EtcdClient
5651
import org.apache.openwhisk.core.etcd.EtcdKV.ContainerKeys
5752
import org.apache.openwhisk.core.etcd.EtcdType._
@@ -65,6 +60,10 @@ import org.scalatest.{Assertion, BeforeAndAfterAll, FlatSpecLike, Matchers}
6560
import spray.json.DefaultJsonProtocol._
6661
import spray.json.{JsObject, _}
6762

63+
import java.net.InetSocketAddress
64+
import java.time.Instant
65+
import java.util.concurrent.TimeUnit
66+
import java.util.concurrent.atomic.AtomicInteger
6867
import scala.collection.mutable
6968
import scala.collection.mutable.{Map => MutableMap}
7069
import scala.concurrent.duration._
@@ -291,8 +290,9 @@ class FunctionPullingContainerProxyTests
291290
Future.successful(count)
292291
}
293292

294-
def getLiveContainerCountFail(count: Long) = LoggedFunction { (_: String, _: FullyQualifiedEntityName, _: DocRevision) =>
295-
Future.failed(new Exception("failure"))
293+
def getLiveContainerCountFail(count: Long) = LoggedFunction {
294+
(_: String, _: FullyQualifiedEntityName, _: DocRevision) =>
295+
Future.failed(new Exception("failure"))
296296
}
297297

298298
def getLiveContainerCountFailFirstCall(count: Long) = {
@@ -961,7 +961,7 @@ class FunctionPullingContainerProxyTests
961961
}
962962
client.send(machine, ClientClosed)
963963

964-
probe.expectMsgAllOf(ContainerRemoved(true), Transition(machine, Running, Removing))
964+
probe.expectMsgAllOf(ContainerRemoved(false), Transition(machine, Running, Removing))
965965

966966
awaitAssert {
967967
factory.calls should have size 1
@@ -1137,7 +1137,8 @@ class FunctionPullingContainerProxyTests
11371137
}
11381138
}
11391139

1140-
it should "destroy container proxy when stopping due to timeout and getting live count fails permanently" in within(timeout) {
1140+
it should "destroy container proxy when stopping due to timeout and getting live count fails permanently" in within(
1141+
timeout) {
11411142
val authStore = mock[ArtifactWhiskAuthStore]
11421143
val namespaceBlacklist: NamespaceBlacklist = new NamespaceBlacklist(authStore)
11431144
val get = getWhiskAction(Future(action.toWhiskAction))
@@ -1532,6 +1533,96 @@ class FunctionPullingContainerProxyTests
15321533
}
15331534
}
15341535

1536+
it should "remove the ETCD data first when disabling the container proxy" in within(timeout) {
1537+
val authStore = mock[ArtifactWhiskAuthStore]
1538+
val namespaceBlacklist: NamespaceBlacklist = new NamespaceBlacklist(authStore)
1539+
val get = getWhiskAction(Future(action.toWhiskAction))
1540+
val dataManagementService = TestProbe()
1541+
val container = new TestContainer
1542+
val factory = createFactory(Future.successful(container))
1543+
val acker = createAcker()
1544+
val store = createStore
1545+
val collector = createCollector()
1546+
val counter = getLiveContainerCount(1)
1547+
val limit = getWarmedContainerLimit(Future.successful((1, 10.seconds)))
1548+
val (client, clientFactory) = testClient
1549+
1550+
val instanceId = InvokerInstanceId(0, userMemory = defaultUserMemory)
1551+
val probe = TestProbe()
1552+
val machine =
1553+
TestFSMRef(
1554+
new FunctionPullingContainerProxy(
1555+
factory,
1556+
entityStore,
1557+
namespaceBlacklist,
1558+
get,
1559+
dataManagementService.ref,
1560+
clientFactory,
1561+
acker,
1562+
store,
1563+
collector,
1564+
counter,
1565+
limit,
1566+
instanceId,
1567+
invokerHealthManager.ref,
1568+
poolConfig,
1569+
timeoutConfig,
1570+
healthchecksConfig(),
1571+
None),
1572+
probe.ref)
1573+
1574+
registerCallback(machine, probe)
1575+
1576+
machine ! Initialize(invocationNamespace.asString, fqn, action, schedulerHost, rpcPort, messageTransId)
1577+
probe.expectMsg(Transition(machine, Uninitialized, CreatingClient))
1578+
client.expectMsg(StartClient)
1579+
client.send(machine, ClientCreationCompleted())
1580+
1581+
val containerId = machine.underlyingActor.stateData.getContainer match {
1582+
case Some(container) => container.containerId
1583+
case None => ContainerId("")
1584+
}
1585+
1586+
dataManagementService.expectMsg(RegisterData(
1587+
s"${ContainerKeys.existingContainers(invocationNamespace.asString, fqn, action.rev, Some(instanceId), Some(containerId))}",
1588+
""))
1589+
1590+
probe.expectMsg(Transition(machine, CreatingClient, ClientCreated))
1591+
expectInitialized(probe)
1592+
client.expectMsg(RequestActivation())
1593+
client.send(machine, message)
1594+
1595+
probe.expectMsg(Transition(machine, ClientCreated, Running))
1596+
client.expectMsg(ContainerWarmed)
1597+
client.expectMsgPF() {
1598+
case RequestActivation(Some(_), None) => true
1599+
}
1600+
client.send(machine, message)
1601+
client.expectMsgPF() {
1602+
case RequestActivation(Some(_), None) => true
1603+
}
1604+
machine ! GracefulShutdown
1605+
1606+
dataManagementService.expectMsg(
1607+
UnregisterData(ContainerKeys
1608+
.existingContainers(invocationNamespace.asString, fqn, action.rev, Some(instanceId), Some(containerId))))
1609+
1610+
client.expectMsg(CloseClientProxy)
1611+
client.send(machine, ClientClosed)
1612+
1613+
probe.expectMsgAllOf(ContainerRemoved(false), Transition(machine, Running, Removing))
1614+
1615+
awaitAssert {
1616+
factory.calls should have size 1
1617+
container.initializeCount shouldBe 1
1618+
container.runCount shouldBe 2
1619+
collector.calls.length shouldBe 2
1620+
container.destroyCount shouldBe 1
1621+
acker.calls.length shouldBe 2
1622+
store.calls.length shouldBe 2
1623+
}
1624+
}
1625+
15351626
it should "pause itself when timeout and recover when got a new Initialize" in within(timeout) {
15361627
val authStore = mock[ArtifactWhiskAuthStore]
15371628
val namespaceBlacklist: NamespaceBlacklist = new NamespaceBlacklist(authStore)

0 commit comments

Comments
 (0)