Skip to content

Add zero downtime deployment #5338

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 21 commits into from
Nov 1, 2022
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
Show all changes
21 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion ansible/roles/invoker/tasks/clean.yml
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@

- name: wait invoker{{ groups['invokers'].index(inventory_hostname) }} to clean up all existing containers
uri:
url: "{{ invoker.protocol }}://{{ ansible_host }}:{{ invoker.port + groups['invokers'].index(inventory_hostname) }}/status/count"
url: "{{ invoker.protocol }}://{{ ansible_host }}:{{ invoker.port + groups['invokers'].index(inventory_hostname) }}/pool/count"
validate_certs: no
client_key: "{{ invoker.confdir }}/invoker{{ groups['invokers'].index(inventory_hostname) }}/{{ invoker.ssl.key }}"
client_cert: "{{ invoker.confdir }}/invoker{{ groups['invokers'].index(inventory_hostname) }}/{{ invoker.ssl.cert }}"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -532,9 +532,7 @@ object InvokerResourceMessage extends DefaultJsonProtocol {
* ...
* ]
*/
object StatusQuery

object ActivationStatusQuery
object GetState

case class StatusData(invocationNamespace: String,
fqn: String,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,15 +18,16 @@
package org.apache.openwhisk.core.containerpool.v2

import java.util.concurrent.atomic.AtomicInteger

import akka.actor.{Actor, ActorRef, ActorRefFactory, Cancellable, Props}
import org.apache.openwhisk.common._
import org.apache.openwhisk.core.connector.ContainerCreationError._
import org.apache.openwhisk.core.connector.{
ContainerCreationAckMessage,
ContainerCreationMessage,
ContainerDeletionMessage,
ResultMetadata,
StatusQuery
GetState,
ResultMetadata
}
import org.apache.openwhisk.core.containerpool.{
AdjustPrewarmedContainer,
Expand All @@ -41,6 +42,7 @@ import org.apache.openwhisk.core.containerpool.{
import org.apache.openwhisk.core.entity._
import org.apache.openwhisk.core.entity.size._
import org.apache.openwhisk.http.Messages
import spray.json.DefaultJsonProtocol

import scala.annotation.tailrec
import scala.collection.concurrent.TrieMap
Expand All @@ -50,6 +52,27 @@ import scala.concurrent.duration._
import scala.util.{Random, Try}
import scala.collection.immutable.Queue

object TotalContainerPoolState extends DefaultJsonProtocol {
implicit val prewarmedPoolSerdes = jsonFormat2(PrewarmedContainerPoolState.apply)
implicit val warmPoolSerdes = jsonFormat2(WarmContainerPoolState.apply)
implicit val totalPoolSerdes = jsonFormat5(TotalContainerPoolState.apply)
}

case class PrewarmedContainerPoolState(total: Int, countsByKind: Map[String, Int])
case class WarmContainerPoolState(total: Int, containers: List[BasicContainerInfo])
case class TotalContainerPoolState(totalContainers: Int,
inProgressCount: Int,
prewarmedPool: PrewarmedContainerPoolState,
busyPool: WarmContainerPoolState,
pausedPool: WarmContainerPoolState) {

def serialize(): String = TotalContainerPoolState.totalPoolSerdes.write(this).compactPrint
}

case class NotSupportedPoolState() {
def serialize(): String = "not supported"
}

case class CreationContainer(creationMessage: ContainerCreationMessage, action: WhiskAction)
case class DeletionContainer(deletionMessage: ContainerDeletionMessage)
case object Remove
Expand Down Expand Up @@ -88,7 +111,7 @@ class FunctionPullingContainerPool(

implicit val ec = context.system.dispatcher

protected[containerpool] var busyPool = immutable.Map.empty[ActorRef, Data]
protected[containerpool] var busyPool = immutable.Map.empty[ActorRef, ContainerAvailableData]
protected[containerpool] var inProgressPool = immutable.Map.empty[ActorRef, Data]
protected[containerpool] var warmedPool = immutable.Map.empty[ActorRef, WarmData]
protected[containerpool] var prewarmedPool = immutable.Map.empty[ActorRef, PreWarmData]
Expand Down Expand Up @@ -414,35 +437,14 @@ class FunctionPullingContainerPool(
prewarmCreateFailedCount.set(0)
adjustPrewarmedContainer(false, true)

case StatusQuery =>
var result = immutable.Map.empty[String, List[String]]
val pools = busyPool ++ warmedPool ++ inProgressPool
pools.foreach { entry =>
entry._2 match {
case InitializedData(container, _, action, _) =>
val key = action.fullyQualifiedName(true).asString
var list = result.getOrElse(key, List.empty[String])
list = container.containerId.asString :: list
result += (action.fullyQualifiedName(true).asString -> list)
case WarmData(container, _, action, _, _, _) =>
val key = action.fullyQualifiedName(true).asString
var list = result.getOrElse(key, List.empty[String])
list = container.containerId.asString :: list
result += (action.fullyQualifiedName(true).asString -> list)
case ContainerCreatedData(container, _, action) =>
val key = action.fullyQualifiedName(true).asString
var list = result.getOrElse(key, List.empty[String])
list = container.containerId.asString :: list
result += (action.fullyQualifiedName(true).asString -> list)
case ReschedulingData(container, _, action, _, _) =>
val key = action.fullyQualifiedName(true).asString
var list = result.getOrElse(key, List.empty[String])
list = container.containerId.asString :: list
result += (action.fullyQualifiedName(true).asString -> list)
case _ => // do nothing
}
}
sender() ! result
case GetState =>
val totalContainers = busyPool.size + inProgressPool.size + warmedPool.size + prewarmedPool.size
val prewarmedState =
PrewarmedContainerPoolState(prewarmedPool.size, prewarmedPool.groupBy(_._2.kind).mapValues(_.size))
val busyState = WarmContainerPoolState(busyPool.size, busyPool.values.map(_.basicContainerInfo).toList)
val pausedState = WarmContainerPoolState(warmedPool.size, warmedPool.values.map(_.basicContainerInfo).toList)
sender() ! TotalContainerPoolState(totalContainers, inProgressPool.size, prewarmedState, busyState, pausedState)

}

/** Install prewarm containers up to the configured requirements for each kind/memory combination or specified kind/memory */
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -127,16 +127,30 @@ case class PreWarmData(container: Container,
def isExpired(): Boolean = expires.exists(_.isOverdue())
}

case class ContainerCreatedData(container: Container, invocationNamespace: String, action: ExecutableWhiskAction)
object BasicContainerInfo extends DefaultJsonProtocol {
implicit val prewarmedPoolSerdes = jsonFormat4(BasicContainerInfo.apply)
}

sealed case class BasicContainerInfo(containerId: String, namespace: String, action: String, kind: String)

sealed abstract class ContainerAvailableData(container: Container,
invocationNamespace: String,
action: ExecutableWhiskAction)
extends Data(action.limits.memory.megabytes.MB) {
override def getContainer = Some(container)

val basicContainerInfo =
BasicContainerInfo(container.containerId.asString, invocationNamespace, action.name.asString, action.exec.kind)
}

case class ContainerCreatedData(container: Container, invocationNamespace: String, action: ExecutableWhiskAction)
extends ContainerAvailableData(container, invocationNamespace, action)

case class InitializedData(container: Container,
invocationNamespace: String,
action: ExecutableWhiskAction,
override val clientProxy: ActorRef)
extends Data(action.limits.memory.megabytes.MB)
extends ContainerAvailableData(container, invocationNamespace, action)
with WithClient {
override def getContainer = Some(container)
def toReschedulingData(resumeRun: RunActivation) =
Expand All @@ -149,7 +163,7 @@ case class WarmData(container: Container,
revision: DocRevision,
lastUsed: Instant,
override val clientProxy: ActorRef)
extends Data(action.limits.memory.megabytes.MB)
extends ContainerAvailableData(container, invocationNamespace, action)
with WithClient {
override def getContainer = Some(container)
def toReschedulingData(resumeRun: RunActivation) =
Expand All @@ -159,12 +173,10 @@ case class WarmData(container: Container,
case class ReschedulingData(container: Container,
invocationNamespace: String,
action: ExecutableWhiskAction,
override val clientProxy: ActorRef,
clientProxy: ActorRef,
resumeRun: RunActivation)
extends Data(action.limits.memory.megabytes.MB)
with WithClient {
override def getContainer = Some(container)
}
extends ContainerAvailableData(container, invocationNamespace, action)
with WithClient

class FunctionPullingContainerProxy(
factory: (TransactionId,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -390,8 +390,9 @@ class FPCInvokerReactive(config: WhiskConfig,
s"${instance.toString} is now disabled."
}

override def status(): Future[Map[String, List[String]]] = {
pool.ask(StatusQuery)(Timeout(5.seconds)).mapTo[Map[String, List[String]]]
override def getPoolState(): Future[Either[NotSupportedPoolState, TotalContainerPoolState]] = {
implicit val timeout: Timeout = 5.seconds
(pool ? GetState).mapTo[TotalContainerPoolState].map(Right(_))
}

override def isEnabled(): String = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,11 +53,25 @@ class FPCInvokerServer(val invoker: InvokerCore, systemUsername: String, systemP
complete(invoker.disable())
} ~ (path("isEnabled") & get) {
complete(invoker.isEnabled())
} ~ (pathPrefix("status") & get) {
} ~ (pathPrefix("pool") & get) {
pathEndOrSingleSlash {
complete(invoker.status().map(_.toJson.compactPrint))
complete {
invoker.getPoolState().map {
case Right(poolState) =>
poolState.serialize()
case Left(value) =>
value.serialize()
}
}
} ~ (path("count") & get) {
complete(invoker.status().map(_.size.toJson.compactPrint))
complete {
invoker.getPoolState().map {
case Right(poolState) =>
poolState.totalContainers.toJson.compactPrint
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the total containers in the container pool state object includes prewarm containers so not sure if you want that included in this route when determining if the invoker is cleared unless disabling immediately tears down prewarms than it's probably fine. If that isn't adequate, I think what you want for determining if the invoker is safe to shut down is summing the totalContainers values of the busyPool and warmedPool

case Left(value) =>
value.serialize()
}
}
}
}
case _ => terminate(StatusCodes.Unauthorized)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import org.apache.openwhisk.common.Https.HttpsConfig
import org.apache.openwhisk.common._
import org.apache.openwhisk.core.WhiskConfig._
import org.apache.openwhisk.core.connector.{MessageProducer, MessagingProvider}
import org.apache.openwhisk.core.containerpool.v2.{NotSupportedPoolState, TotalContainerPoolState}
import org.apache.openwhisk.core.containerpool.{Container, ContainerPoolConfig}
import org.apache.openwhisk.core.entity._
import org.apache.openwhisk.core.entity.size._
Expand Down Expand Up @@ -250,7 +251,7 @@ trait InvokerCore {
def disable(): String
def isEnabled(): String
def backfillPrewarm(): Route
def status(): Future[Map[String, List[String]]]
def getPoolState(): Future[Either[NotSupportedPoolState, TotalContainerPoolState]]
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is one small change from the original codes.
I wanted to isolate Akka-HTTP dependency on the invoker server layer.
So each InvokerReactive just returns non-Akka-HTTP-related data.
The invoker server is only dependent on Akka-HTTP.

}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,9 @@

package org.apache.openwhisk.core.invoker

import java.nio.charset.StandardCharsets
import java.time.Instant

import akka.Done
import akka.actor.{ActorRef, ActorRefFactory, ActorSystem, CoordinatedShutdown, Props}
import akka.event.Logging.InfoLevel
Expand All @@ -28,6 +31,7 @@ import org.apache.openwhisk.core.ack.{MessagingActiveAck, UserEventSender}
import org.apache.openwhisk.core.connector._
import org.apache.openwhisk.core.containerpool._
import org.apache.openwhisk.core.containerpool.logging.LogStoreProvider
import org.apache.openwhisk.core.containerpool.v2.{NotSupportedPoolState, TotalContainerPoolState}
import org.apache.openwhisk.core.database._
import org.apache.openwhisk.core.entity._
import org.apache.openwhisk.core.invoker.Invoker.InvokerEnabled
Expand All @@ -38,9 +42,6 @@ import pureconfig._
import pureconfig.generic.auto._
import spray.json._

import java.nio.charset.StandardCharsets
import java.time.Instant
import scala.collection.immutable
import scala.concurrent.duration._
import scala.concurrent.{ExecutionContext, Future}
import scala.util.{Failure, Success}
Expand Down Expand Up @@ -332,8 +333,8 @@ class InvokerReactive(
complete("not supported")
}

override def status(): Future[Map[String, List[String]]] = {
Future.successful(immutable.Map.empty[String, List[String]])
override def getPoolState(): Future[Either[NotSupportedPoolState, TotalContainerPoolState]] = {
Future.successful(Left(NotSupportedPoolState()))
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -131,7 +131,7 @@ class Scheduler(schedulerId: SchedulerInstanceId, schedulerEndpoints: SchedulerE
}

override def getQueueStatusData: Future[List[StatusData]] = {
queueManager.ask(StatusQuery)(Timeout(1.minute)).mapTo[Future[List[StatusData]]].flatten
queueManager.ask(GetState)(Timeout(1.minute)).mapTo[Future[List[StatusData]]].flatten
}

override def disable(): Unit = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -579,7 +579,7 @@ class MemoryQueue(private val etcdClient: EtcdClient,
stay

// common case for all statuses
case Event(StatusQuery, _) =>
case Event(GetState, _) =>
sender ! StatusData(
invocationNamespace,
action.asString,
Expand Down Expand Up @@ -1029,7 +1029,8 @@ class MemoryQueue(private val etcdClient: EtcdClient,
queue = newQueue
logging.info(
this,
s"[$invocationNamespace:$action:$stateName] Get activation request ${request.containerId}, send one message: ${msg.activationId}")(msg.transid)
s"[$invocationNamespace:$action:$stateName] Get activation request ${request.containerId}, send one message: ${msg.activationId}")(
msg.transid)
val totalTimeInScheduler = Interval(msg.transid.meta.start, Instant.now()).duration
MetricEmitter.emitHistogramMetric(
LoggingMarkers.SCHEDULER_WAIT_TIME(action.asString),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -261,9 +261,9 @@ class QueueManager(
case QueueSize =>
sender ! QueuePool.size

case StatusQuery =>
case GetState =>
val result =
Future.sequence(QueuePool.values.map(_.queue.ask(StatusQuery)(Timeout(5.seconds)).mapTo[StatusData]).toList)
Future.sequence(QueuePool.values.map(_.queue.ask(GetState)(Timeout(5.seconds)).mapTo[StatusData]).toList)

sender ! result

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import akka.http.scaladsl.testkit.ScalatestRouteTest
import akka.http.scaladsl.unmarshalling.Unmarshal
import common.StreamLogging
import org.apache.openwhisk.common.TransactionId
import org.apache.openwhisk.core.containerpool.v2.{NotSupportedPoolState, TotalContainerPoolState}
import org.apache.openwhisk.core.invoker.Invoker.InvokerEnabled
import org.apache.openwhisk.core.invoker.{DefaultInvokerServer, InvokerCore}
import org.apache.openwhisk.http.BasicHttpService
Expand Down Expand Up @@ -156,8 +157,8 @@ class TestInvokerReactive extends InvokerCore with BasicHttpService {
complete("")
}

override def status(): Future[Map[String, List[String]]] = {
Future.successful(Map.empty[String, List[String]])
override def getPoolState(): Future[Either[NotSupportedPoolState, TotalContainerPoolState]] = {
Future.successful(Left(NotSupportedPoolState()))
}

def reset(): Unit = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import akka.http.scaladsl.testkit.ScalatestRouteTest
import akka.http.scaladsl.unmarshalling.Unmarshal
import common.StreamLogging
import org.apache.openwhisk.common.TransactionId
import org.apache.openwhisk.core.containerpool.v2.{NotSupportedPoolState, TotalContainerPoolState}
import org.apache.openwhisk.core.invoker.Invoker.InvokerEnabled
import org.apache.openwhisk.core.invoker.{FPCInvokerServer, InvokerCore}
import org.apache.openwhisk.http.BasicHttpService
Expand Down Expand Up @@ -155,8 +156,8 @@ class TestFPCInvokerReactive extends InvokerCore with BasicHttpService {
complete("")
}

override def status(): Future[Map[String, List[String]]] = {
Future.successful(Map.empty[String, List[String]])
override def getPoolState(): Future[Either[NotSupportedPoolState, TotalContainerPoolState]] = {
Future.successful(Left(NotSupportedPoolState()))
}

def reset(): Unit = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ import org.apache.openwhisk.common.{GracefulShutdown, TransactionId}
import org.apache.openwhisk.core.WarmUp.warmUpAction
import org.apache.openwhisk.core.ack.ActiveAck
import org.apache.openwhisk.core.connector.test.TestConnector
import org.apache.openwhisk.core.connector.{AcknowledegmentMessage, ActivationMessage, StatusData, StatusQuery}
import org.apache.openwhisk.core.connector.{AcknowledegmentMessage, ActivationMessage, GetState, StatusData}
import org.apache.openwhisk.core.database.{ArtifactStore, DocumentRevisionMismatchException, UserContext}
import org.apache.openwhisk.core.entity.ExecManifest.{ImageName, RuntimeManifest}
import org.apache.openwhisk.core.entity._
Expand Down Expand Up @@ -133,7 +133,7 @@ class QueueManagerTests
override def receive: Receive = {
case GetActivation(_, _, _, _, _, _) =>
sender ! ActivationResponse(Right(newActivation()))
case StatusQuery =>
case GetState =>
sender ! statusData
}
}))
Expand Down Expand Up @@ -797,7 +797,7 @@ class QueueManagerTests

(queueManager ? QueueSize).mapTo[Int].futureValue shouldBe 1

(queueManager ? StatusQuery).mapTo[List[StatusData]].futureValue shouldBe List(statusData)
(queueManager ? GetState).mapTo[List[StatusData]].futureValue shouldBe List(statusData)
}

it should "drop the activation message that has not been scheduled for a long time" in {
Expand Down