-
Notifications
You must be signed in to change notification settings - Fork 1.2k
Delete ETCD data first when disabling the invoker #5333
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -606,12 +606,7 @@ class FunctionPullingContainerProxy( | |
if (runningActivations.isEmpty) { | ||
logging.info(this, s"The Client closed in state: $stateName, action: ${data.action}") | ||
// Stop ContainerProxy(ActivationClientProxy will stop also when send ClientClosed to ContainerProxy). | ||
cleanUp( | ||
data.container, | ||
data.invocationNamespace, | ||
data.action.fullyQualifiedName(withVersion = true), | ||
data.action.rev, | ||
None) | ||
cleanUp(data.container, None, false) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This just deletes the container only as data is already removed here. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. It's guaranteed that the etcd data will be cleaned up right since the data management service will retry indefinitely once it receives it? Is there any case this condition can be hit without the Sort of a side question but what happens to etcd data if an invoker instance is lost unexpectedly? If the invoker is lost is the data stuck indefinitely until an invoker with the same id is restarted to wipe it on startup? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. All data are supposed to be removed after lease timeout this is because no more keepalive request is sent when an invoker is unexpectedly terminated. |
||
} else { | ||
logging.info( | ||
this, | ||
|
@@ -624,6 +619,15 @@ class FunctionPullingContainerProxy( | |
// ContainerProxy will be terminated by StateTimeout if there is no further activation | ||
case Event(GracefulShutdown, data: WarmData) => | ||
logging.info(this, s"receive GracefulShutdown for action: ${data.action}") | ||
// clean up the etcd data first so that the scheduler can provision more containers in advance. | ||
dataManagementService ! UnregisterData( | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Tangential but would it be useful to add an api to the invoker to show 1. how many action containers are still active from the container pool and 2. by action how many containers there are? That way you can use the api to check if the disabling process has completed when all containers are shut down and it's safe to shutdown the service. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Yes, that would be great. |
||
ContainerKeys.existingContainers( | ||
data.invocationNamespace, | ||
data.action.fullyQualifiedName(true), | ||
data.action.rev, | ||
Some(instance), | ||
Some(data.container.containerId))) | ||
|
||
// Just send CloseClientProxy to ActivationClientProxy, make ActivationClientProxy throw ClientClosedException when fetchActivation next time. | ||
data.clientProxy ! CloseClientProxy | ||
stay | ||
|
@@ -765,10 +769,14 @@ class FunctionPullingContainerProxy( | |
case Event(StateTimeout, _) => | ||
logging.error(this, s"could not receive ClientClosed for ${unusedTimeout}, so just stop the container proxy.") | ||
|
||
stop | ||
stop() | ||
|
||
case Event(Remove | GracefulShutdown, _) => | ||
stay() | ||
|
||
|
||
case Event(DetermineKeepContainer(_), _) => | ||
stay() | ||
} | ||
|
||
whenUnhandled { | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -17,22 +17,17 @@ | |
|
||
package org.apache.openwhisk.core.containerpool.v2.test | ||
|
||
import java.net.InetSocketAddress | ||
import java.time.Instant | ||
import java.util.concurrent.TimeUnit | ||
import java.util.concurrent.atomic.AtomicInteger | ||
|
||
import akka.actor.FSM.{CurrentState, StateTimeout, SubscribeTransitionCallBack, Transition} | ||
import akka.actor.{Actor, ActorRef, ActorRefFactory, ActorSystem, Props} | ||
import akka.http.scaladsl.model | ||
import akka.io.Tcp.Connect | ||
import akka.stream.scaladsl.{Sink, Source} | ||
import akka.testkit.{ImplicitSender, TestKit, TestProbe} | ||
import akka.testkit.{ImplicitSender, TestFSMRef, TestKit, TestProbe} | ||
import akka.util.ByteString | ||
import com.ibm.etcd.api.{DeleteRangeResponse, KeyValue, PutResponse} | ||
import com.ibm.etcd.client.{EtcdClient => Client} | ||
import common.{LoggedFunction, StreamLogging, SynchronizedLoggedFunction} | ||
import org.apache.openwhisk.common.{Logging, TransactionId} | ||
import org.apache.openwhisk.common.{GracefulShutdown, Logging, TransactionId} | ||
import org.apache.openwhisk.core.ack.ActiveAck | ||
import org.apache.openwhisk.core.connector.{AcknowledegmentMessage, ActivationMessage} | ||
import org.apache.openwhisk.core.containerpool.logging.LogCollectingException | ||
|
@@ -51,7 +46,7 @@ import org.apache.openwhisk.core.database.{ArtifactStore, StaleParameter, UserCo | |
import org.apache.openwhisk.core.entity.ExecManifest.{ImageName, RuntimeManifest} | ||
import org.apache.openwhisk.core.entity.size._ | ||
import org.apache.openwhisk.core.entity.types.AuthStore | ||
import org.apache.openwhisk.core.entity.{ExecutableWhiskAction, _} | ||
import org.apache.openwhisk.core.entity._ | ||
import org.apache.openwhisk.core.etcd.EtcdClient | ||
import org.apache.openwhisk.core.etcd.EtcdKV.ContainerKeys | ||
import org.apache.openwhisk.core.etcd.EtcdType._ | ||
|
@@ -65,6 +60,10 @@ import org.scalatest.{Assertion, BeforeAndAfterAll, FlatSpecLike, Matchers} | |
import spray.json.DefaultJsonProtocol._ | ||
import spray.json.{JsObject, _} | ||
|
||
import java.net.InetSocketAddress | ||
import java.time.Instant | ||
import java.util.concurrent.TimeUnit | ||
import java.util.concurrent.atomic.AtomicInteger | ||
import scala.collection.mutable | ||
import scala.collection.mutable.{Map => MutableMap} | ||
import scala.concurrent.duration._ | ||
|
@@ -291,8 +290,9 @@ class FunctionPullingContainerProxyTests | |
Future.successful(count) | ||
} | ||
|
||
def getLiveContainerCountFail(count: Long) = LoggedFunction { (_: String, _: FullyQualifiedEntityName, _: DocRevision) => | ||
Future.failed(new Exception("failure")) | ||
def getLiveContainerCountFail(count: Long) = LoggedFunction { | ||
(_: String, _: FullyQualifiedEntityName, _: DocRevision) => | ||
Future.failed(new Exception("failure")) | ||
} | ||
|
||
def getLiveContainerCountFailFirstCall(count: Long) = { | ||
|
@@ -961,7 +961,7 @@ class FunctionPullingContainerProxyTests | |
} | ||
client.send(machine, ClientClosed) | ||
|
||
probe.expectMsgAllOf(ContainerRemoved(true), Transition(machine, Running, Removing)) | ||
probe.expectMsgAllOf(ContainerRemoved(false), Transition(machine, Running, Removing)) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Since the |
||
|
||
awaitAssert { | ||
factory.calls should have size 1 | ||
|
@@ -1137,7 +1137,8 @@ class FunctionPullingContainerProxyTests | |
} | ||
} | ||
|
||
it should "destroy container proxy when stopping due to timeout and getting live count fails permanently" in within(timeout) { | ||
it should "destroy container proxy when stopping due to timeout and getting live count fails permanently" in within( | ||
timeout) { | ||
val authStore = mock[ArtifactWhiskAuthStore] | ||
val namespaceBlacklist: NamespaceBlacklist = new NamespaceBlacklist(authStore) | ||
val get = getWhiskAction(Future(action.toWhiskAction)) | ||
|
@@ -1532,6 +1533,96 @@ class FunctionPullingContainerProxyTests | |
} | ||
} | ||
|
||
it should "remove the ETCD data first when disabling the container proxy" in within(timeout) { | ||
val authStore = mock[ArtifactWhiskAuthStore] | ||
val namespaceBlacklist: NamespaceBlacklist = new NamespaceBlacklist(authStore) | ||
val get = getWhiskAction(Future(action.toWhiskAction)) | ||
val dataManagementService = TestProbe() | ||
val container = new TestContainer | ||
val factory = createFactory(Future.successful(container)) | ||
val acker = createAcker() | ||
val store = createStore | ||
val collector = createCollector() | ||
val counter = getLiveContainerCount(1) | ||
val limit = getWarmedContainerLimit(Future.successful((1, 10.seconds))) | ||
val (client, clientFactory) = testClient | ||
|
||
val instanceId = InvokerInstanceId(0, userMemory = defaultUserMemory) | ||
val probe = TestProbe() | ||
val machine = | ||
TestFSMRef( | ||
new FunctionPullingContainerProxy( | ||
factory, | ||
entityStore, | ||
namespaceBlacklist, | ||
get, | ||
dataManagementService.ref, | ||
clientFactory, | ||
acker, | ||
store, | ||
collector, | ||
counter, | ||
limit, | ||
instanceId, | ||
invokerHealthManager.ref, | ||
poolConfig, | ||
timeoutConfig, | ||
healthchecksConfig(), | ||
None), | ||
probe.ref) | ||
|
||
registerCallback(machine, probe) | ||
|
||
machine ! Initialize(invocationNamespace.asString, fqn, action, schedulerHost, rpcPort, messageTransId) | ||
probe.expectMsg(Transition(machine, Uninitialized, CreatingClient)) | ||
client.expectMsg(StartClient) | ||
client.send(machine, ClientCreationCompleted()) | ||
|
||
val containerId = machine.underlyingActor.stateData.getContainer match { | ||
case Some(container) => container.containerId | ||
case None => ContainerId("") | ||
} | ||
|
||
dataManagementService.expectMsg(RegisterData( | ||
s"${ContainerKeys.existingContainers(invocationNamespace.asString, fqn, action.rev, Some(instanceId), Some(containerId))}", | ||
"")) | ||
|
||
probe.expectMsg(Transition(machine, CreatingClient, ClientCreated)) | ||
expectInitialized(probe) | ||
client.expectMsg(RequestActivation()) | ||
client.send(machine, message) | ||
|
||
probe.expectMsg(Transition(machine, ClientCreated, Running)) | ||
client.expectMsg(ContainerWarmed) | ||
client.expectMsgPF() { | ||
case RequestActivation(Some(_), None) => true | ||
} | ||
client.send(machine, message) | ||
client.expectMsgPF() { | ||
case RequestActivation(Some(_), None) => true | ||
} | ||
machine ! GracefulShutdown | ||
|
||
dataManagementService.expectMsg( | ||
UnregisterData(ContainerKeys | ||
.existingContainers(invocationNamespace.asString, fqn, action.rev, Some(instanceId), Some(containerId)))) | ||
|
||
client.expectMsg(CloseClientProxy) | ||
client.send(machine, ClientClosed) | ||
|
||
probe.expectMsgAllOf(ContainerRemoved(false), Transition(machine, Running, Removing)) | ||
|
||
awaitAssert { | ||
factory.calls should have size 1 | ||
container.initializeCount shouldBe 1 | ||
container.runCount shouldBe 2 | ||
collector.calls.length shouldBe 2 | ||
container.destroyCount shouldBe 1 | ||
acker.calls.length shouldBe 2 | ||
store.calls.length shouldBe 2 | ||
} | ||
} | ||
|
||
it should "pause itself when timeout and recover when got a new Initialize" in within(timeout) { | ||
val authStore = mock[ArtifactWhiskAuthStore] | ||
val namespaceBlacklist: NamespaceBlacklist = new NamespaceBlacklist(authStore) | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is one of the mandatory configs.
This is used as a prefix for the ETCD data.
When we deploy a new version, each version of the components can communicate with each other with based on this information.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we default this to
whisk
in the.conf
? That's what is used right now right? Or is it like the kafka prefix where it would be in front of thewhisk
prefix so if it's not defined in config then it just uses''
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If no configuration is provided, the default value
whisk
will be used.But the problem is if you use different values for each deployment to distinguish them, it wouldn't work as the scheduler data in ETCD only applies the prefix.
Controllers fetch scheduler endpoints with this prefix and schedulers fetch available invokers with this prefix.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yea that makes sense just if you don't care about that you don't have to define this right? If you did one deployment with no configuration it will be whisk and then you could do a new deployment defining the config and then there won't be any clash
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Exactly.