Skip to content

Delete ETCD data first when disabling the invoker #5333

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 3 commits into from
Oct 14, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions ansible/roles/controller/tasks/deploy.yml
Original file line number Diff line number Diff line change
Expand Up @@ -154,6 +154,7 @@

"CONFIG_whisk_info_date": "{{ whisk.version.date }}"
"CONFIG_whisk_info_buildNo": "{{ docker.image.tag }}"
"CONFIG_whisk_cluster_name": "{{ whisk.cluster_name | lower }}"
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is one of the mandatory configs.
This is used as a prefix for the ETCD data.
When we deploy a new version, each version of the components can communicate with each other with based on this information.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we default this to whisk in the .conf? That's what is used right now right? Or is it like the kafka prefix where it would be in front of the whisk prefix so if it's not defined in config then it just uses ''

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If no configuration is provided, the default value whisk will be used.
But the problem is if you use different values for each deployment to distinguish them, it wouldn't work as the scheduler data in ETCD only applies the prefix.
Controllers fetch scheduler endpoints with this prefix and schedulers fetch available invokers with this prefix.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yea that makes sense just if you don't care about that you don't have to define this right? If you did one deployment with no configuration it will be whisk and then you could do a new deployment defining the config and then there won't be any clash

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Exactly.


"KAFKA_HOSTS": "{{ kafka_connect_string }}"
"CONFIG_whisk_kafka_replicationFactor":
Expand Down
1 change: 1 addition & 0 deletions ansible/roles/invoker/tasks/deploy.yml
Original file line number Diff line number Diff line change
Expand Up @@ -296,6 +296,7 @@
"CONFIG_whisk_containerPool_batchDeletionSize": "{{ container_pool_batchDeletionSize | default(10) }}"
"CONFIG_whisk_invoker_username": "{{ invoker.username }}"
"CONFIG_whisk_invoker_password": "{{ invoker.password }}"
"CONFIG_whisk_cluster_name": "{{ whisk.cluster_name | lower }}"

- name: extend invoker dns env
set_fact:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -606,12 +606,7 @@ class FunctionPullingContainerProxy(
if (runningActivations.isEmpty) {
logging.info(this, s"The Client closed in state: $stateName, action: ${data.action}")
// Stop ContainerProxy(ActivationClientProxy will stop also when send ClientClosed to ContainerProxy).
cleanUp(
data.container,
data.invocationNamespace,
data.action.fullyQualifiedName(withVersion = true),
data.action.rev,
None)
cleanUp(data.container, None, false)
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's guaranteed that the etcd data will be cleaned up right since the data management service will retry indefinitely once it receives it? Is there any case this condition can be hit without the GracefulShutdown event occurring?

Sort of a side question but what happens to etcd data if an invoker instance is lost unexpectedly? If the invoker is lost is the data stuck indefinitely until an invoker with the same id is restarted to wipe it on startup?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

All data are supposed to be removed after lease timeout this is because no more keepalive request is sent when an invoker is unexpectedly terminated.
The default timeout is 10s. So there can be wrong data for 10s.
It's a tradeoff. When there is a temporal network rupture and no keepalive could be sent to ETCD, the data can be unexpectedly removed if this timeout is too short.

} else {
logging.info(
this,
Expand All @@ -624,6 +619,15 @@ class FunctionPullingContainerProxy(
// ContainerProxy will be terminated by StateTimeout if there is no further activation
case Event(GracefulShutdown, data: WarmData) =>
logging.info(this, s"receive GracefulShutdown for action: ${data.action}")
// clean up the etcd data first so that the scheduler can provision more containers in advance.
dataManagementService ! UnregisterData(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Tangential but would it be useful to add an api to the invoker to show 1. how many action containers are still active from the container pool and 2. by action how many containers there are? That way you can use the api to check if the disabling process has completed when all containers are shut down and it's safe to shutdown the service.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, that would be great.

ContainerKeys.existingContainers(
data.invocationNamespace,
data.action.fullyQualifiedName(true),
data.action.rev,
Some(instance),
Some(data.container.containerId)))

// Just send CloseClientProxy to ActivationClientProxy, make ActivationClientProxy throw ClientClosedException when fetchActivation next time.
data.clientProxy ! CloseClientProxy
stay
Expand Down Expand Up @@ -765,10 +769,14 @@ class FunctionPullingContainerProxy(
case Event(StateTimeout, _) =>
logging.error(this, s"could not receive ClientClosed for ${unusedTimeout}, so just stop the container proxy.")

stop
stop()

case Event(Remove | GracefulShutdown, _) =>
stay()


case Event(DetermineKeepContainer(_), _) =>
stay()
}

whenUnhandled {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,22 +17,17 @@

package org.apache.openwhisk.core.containerpool.v2.test

import java.net.InetSocketAddress
import java.time.Instant
import java.util.concurrent.TimeUnit
import java.util.concurrent.atomic.AtomicInteger

import akka.actor.FSM.{CurrentState, StateTimeout, SubscribeTransitionCallBack, Transition}
import akka.actor.{Actor, ActorRef, ActorRefFactory, ActorSystem, Props}
import akka.http.scaladsl.model
import akka.io.Tcp.Connect
import akka.stream.scaladsl.{Sink, Source}
import akka.testkit.{ImplicitSender, TestKit, TestProbe}
import akka.testkit.{ImplicitSender, TestFSMRef, TestKit, TestProbe}
import akka.util.ByteString
import com.ibm.etcd.api.{DeleteRangeResponse, KeyValue, PutResponse}
import com.ibm.etcd.client.{EtcdClient => Client}
import common.{LoggedFunction, StreamLogging, SynchronizedLoggedFunction}
import org.apache.openwhisk.common.{Logging, TransactionId}
import org.apache.openwhisk.common.{GracefulShutdown, Logging, TransactionId}
import org.apache.openwhisk.core.ack.ActiveAck
import org.apache.openwhisk.core.connector.{AcknowledegmentMessage, ActivationMessage}
import org.apache.openwhisk.core.containerpool.logging.LogCollectingException
Expand All @@ -51,7 +46,7 @@ import org.apache.openwhisk.core.database.{ArtifactStore, StaleParameter, UserCo
import org.apache.openwhisk.core.entity.ExecManifest.{ImageName, RuntimeManifest}
import org.apache.openwhisk.core.entity.size._
import org.apache.openwhisk.core.entity.types.AuthStore
import org.apache.openwhisk.core.entity.{ExecutableWhiskAction, _}
import org.apache.openwhisk.core.entity._
import org.apache.openwhisk.core.etcd.EtcdClient
import org.apache.openwhisk.core.etcd.EtcdKV.ContainerKeys
import org.apache.openwhisk.core.etcd.EtcdType._
Expand All @@ -65,6 +60,10 @@ import org.scalatest.{Assertion, BeforeAndAfterAll, FlatSpecLike, Matchers}
import spray.json.DefaultJsonProtocol._
import spray.json.{JsObject, _}

import java.net.InetSocketAddress
import java.time.Instant
import java.util.concurrent.TimeUnit
import java.util.concurrent.atomic.AtomicInteger
import scala.collection.mutable
import scala.collection.mutable.{Map => MutableMap}
import scala.concurrent.duration._
Expand Down Expand Up @@ -291,8 +290,9 @@ class FunctionPullingContainerProxyTests
Future.successful(count)
}

def getLiveContainerCountFail(count: Long) = LoggedFunction { (_: String, _: FullyQualifiedEntityName, _: DocRevision) =>
Future.failed(new Exception("failure"))
def getLiveContainerCountFail(count: Long) = LoggedFunction {
(_: String, _: FullyQualifiedEntityName, _: DocRevision) =>
Future.failed(new Exception("failure"))
}

def getLiveContainerCountFailFirstCall(count: Long) = {
Expand Down Expand Up @@ -961,7 +961,7 @@ class FunctionPullingContainerProxyTests
}
client.send(machine, ClientClosed)

probe.expectMsgAllOf(ContainerRemoved(true), Transition(machine, Running, Removing))
probe.expectMsgAllOf(ContainerRemoved(false), Transition(machine, Running, Removing))
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since the true here means, it replaces a prewarm container, false is the correct value.


awaitAssert {
factory.calls should have size 1
Expand Down Expand Up @@ -1137,7 +1137,8 @@ class FunctionPullingContainerProxyTests
}
}

it should "destroy container proxy when stopping due to timeout and getting live count fails permanently" in within(timeout) {
it should "destroy container proxy when stopping due to timeout and getting live count fails permanently" in within(
timeout) {
val authStore = mock[ArtifactWhiskAuthStore]
val namespaceBlacklist: NamespaceBlacklist = new NamespaceBlacklist(authStore)
val get = getWhiskAction(Future(action.toWhiskAction))
Expand Down Expand Up @@ -1532,6 +1533,96 @@ class FunctionPullingContainerProxyTests
}
}

it should "remove the ETCD data first when disabling the container proxy" in within(timeout) {
val authStore = mock[ArtifactWhiskAuthStore]
val namespaceBlacklist: NamespaceBlacklist = new NamespaceBlacklist(authStore)
val get = getWhiskAction(Future(action.toWhiskAction))
val dataManagementService = TestProbe()
val container = new TestContainer
val factory = createFactory(Future.successful(container))
val acker = createAcker()
val store = createStore
val collector = createCollector()
val counter = getLiveContainerCount(1)
val limit = getWarmedContainerLimit(Future.successful((1, 10.seconds)))
val (client, clientFactory) = testClient

val instanceId = InvokerInstanceId(0, userMemory = defaultUserMemory)
val probe = TestProbe()
val machine =
TestFSMRef(
new FunctionPullingContainerProxy(
factory,
entityStore,
namespaceBlacklist,
get,
dataManagementService.ref,
clientFactory,
acker,
store,
collector,
counter,
limit,
instanceId,
invokerHealthManager.ref,
poolConfig,
timeoutConfig,
healthchecksConfig(),
None),
probe.ref)

registerCallback(machine, probe)

machine ! Initialize(invocationNamespace.asString, fqn, action, schedulerHost, rpcPort, messageTransId)
probe.expectMsg(Transition(machine, Uninitialized, CreatingClient))
client.expectMsg(StartClient)
client.send(machine, ClientCreationCompleted())

val containerId = machine.underlyingActor.stateData.getContainer match {
case Some(container) => container.containerId
case None => ContainerId("")
}

dataManagementService.expectMsg(RegisterData(
s"${ContainerKeys.existingContainers(invocationNamespace.asString, fqn, action.rev, Some(instanceId), Some(containerId))}",
""))

probe.expectMsg(Transition(machine, CreatingClient, ClientCreated))
expectInitialized(probe)
client.expectMsg(RequestActivation())
client.send(machine, message)

probe.expectMsg(Transition(machine, ClientCreated, Running))
client.expectMsg(ContainerWarmed)
client.expectMsgPF() {
case RequestActivation(Some(_), None) => true
}
client.send(machine, message)
client.expectMsgPF() {
case RequestActivation(Some(_), None) => true
}
machine ! GracefulShutdown

dataManagementService.expectMsg(
UnregisterData(ContainerKeys
.existingContainers(invocationNamespace.asString, fqn, action.rev, Some(instanceId), Some(containerId))))

client.expectMsg(CloseClientProxy)
client.send(machine, ClientClosed)

probe.expectMsgAllOf(ContainerRemoved(false), Transition(machine, Running, Removing))

awaitAssert {
factory.calls should have size 1
container.initializeCount shouldBe 1
container.runCount shouldBe 2
collector.calls.length shouldBe 2
container.destroyCount shouldBe 1
acker.calls.length shouldBe 2
store.calls.length shouldBe 2
}
}

it should "pause itself when timeout and recover when got a new Initialize" in within(timeout) {
val authStore = mock[ArtifactWhiskAuthStore]
val namespaceBlacklist: NamespaceBlacklist = new NamespaceBlacklist(authStore)
Expand Down