Skip to content

[New Scheduler] Add container message consumer #5111

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 3 commits into from
May 28, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ case class ActivationMessage(override val transid: TransactionId,
lockedArgs: Map[String, String] = Map.empty,
cause: Option[ActivationId] = None,
traceContext: Option[Map[String, String]] = None)
extends Message {
extends Message {

override def serialize = ActivationMessage.serdes.write(this).compactPrint

Expand All @@ -78,6 +78,7 @@ case class ActivationMessage(override val transid: TransactionId,
*/
abstract class AcknowledegmentMessage(private val tid: TransactionId) extends Message {
override val transid: TransactionId = tid

override def serialize: String = AcknowledegmentMessage.serdes.write(this).compactPrint

/** Pithy descriptor for logging. */
Expand Down Expand Up @@ -115,17 +116,23 @@ abstract class AcknowledegmentMessage(private val tid: TransactionId) extends Me
* The constructor is private so that callers must use the more restrictive constructors which ensure the respose is always
* Right when this message is created.
*/
case class CombinedCompletionAndResultMessage private (override val transid: TransactionId,
response: Either[ActivationId, WhiskActivation],
override val isSystemError: Option[Boolean],
instance: InstanceId)
extends AcknowledegmentMessage(transid) {
case class CombinedCompletionAndResultMessage private(override val transid: TransactionId,
response: Either[ActivationId, WhiskActivation],
override val isSystemError: Option[Boolean],
instance: InstanceId)
extends AcknowledegmentMessage(transid) {
override def messageType = "combined"

override def result = Some(response)

override def isSlotFree = Some(instance)

override def activationId = response.fold(identity, _.activationId)

override def toJson = CombinedCompletionAndResultMessage.serdes.write(this)

override def shrink = copy(response = response.flatMap(a => Left(a.activationId)))

override def toString = activationId.asString
}

Expand All @@ -135,16 +142,21 @@ case class CombinedCompletionAndResultMessage private (override val transid: Tra
* phase notification to the load balancer where an invoker first sends a `ResultMessage` and later sends the
* `CompletionMessage`.
*/
case class CompletionMessage private (override val transid: TransactionId,
override val activationId: ActivationId,
override val isSystemError: Option[Boolean],
instance: InstanceId)
extends AcknowledegmentMessage(transid) {
case class CompletionMessage private(override val transid: TransactionId,
override val activationId: ActivationId,
override val isSystemError: Option[Boolean],
instance: InstanceId)
extends AcknowledegmentMessage(transid) {
override def messageType = "completion"

override def result = None

override def isSlotFree = Some(instance)

override def toJson = CompletionMessage.serdes.write(this)

override def shrink = this

override def toString = activationId.asString
}

Expand All @@ -156,15 +168,22 @@ case class CompletionMessage private (override val transid: TransactionId,
* The constructor is private so that callers must use the more restrictive constructors which ensure the respose is always
* Right when this message is created.
*/
case class ResultMessage private (override val transid: TransactionId, response: Either[ActivationId, WhiskActivation])
extends AcknowledegmentMessage(transid) {
case class ResultMessage private(override val transid: TransactionId, response: Either[ActivationId, WhiskActivation])
extends AcknowledegmentMessage(transid) {
override def messageType = "result"

override def result = Some(response)

override def isSlotFree = None

override def isSystemError = response.fold(_ => None, a => Some(a.response.isWhiskError))

override def activationId = response.fold(identity, _.activationId)

override def toJson = ResultMessage.serdes.write(this)

override def shrink = copy(response = response.flatMap(a => Left(a.activationId)))

override def toString = activationId.asString
}

Expand Down Expand Up @@ -234,7 +253,7 @@ object AcknowledegmentMessage extends DefaultJsonProtocol {
Left(value.convertTo[ActivationId])

case _: JsObject => Right(value.convertTo[WhiskActivation])
case _ => deserializationError("could not read ResultMessage")
case _ => deserializationError("could not read ResultMessage")
}
}

Expand Down Expand Up @@ -265,6 +284,7 @@ case class PingMessage(instance: InvokerInstanceId) extends Message {

object PingMessage extends DefaultJsonProtocol {
def parse(msg: String) = Try(serdes.read(msg.parseJson))

implicit val serdes = jsonFormat(PingMessage.apply _, "name")
}

Expand All @@ -276,7 +296,7 @@ object EventMessageBody extends DefaultJsonProtocol {

implicit val format = new JsonFormat[EventMessageBody] {
def write(eventMessageBody: EventMessageBody) = eventMessageBody match {
case m: Metric => m.toJson
case m: Metric => m.toJson
case a: Activation => a.toJson
}

Expand All @@ -301,9 +321,11 @@ case class Activation(name: String,
causedBy: Option[String],
size: Option[Int] = None,
userDefinedStatusCode: Option[Int] = None)
extends EventMessageBody {
extends EventMessageBody {
val typeName = Activation.typeName

override def serialize = toJson.compactPrint

def entityPath: FullyQualifiedEntityName = EntityPath(name).toFullyQualifiedEntityName

def toJson = Activation.activationFormat.write(this)
Expand All @@ -327,12 +349,12 @@ object Activation extends DefaultJsonProtocol {
private implicit val durationFormat = new RootJsonFormat[Duration] {
override def write(obj: Duration): JsValue = obj match {
case o if o.isFinite => JsNumber(o.toMillis)
case _ => JsNumber.zero
case _ => JsNumber.zero
}

override def read(json: JsValue): Duration = json match {
case JsNumber(n) if n <= 0 => Duration.Zero
case JsNumber(n) => toDuration(n.longValue)
case JsNumber(n) => toDuration(n.longValue)
}
}

Expand All @@ -352,7 +374,7 @@ object Activation extends DefaultJsonProtocol {
"size",
"userDefinedStatusCode")

/** Get "StatusCode" from result response set by action developer **/
/** Get "StatusCode" from result response set by action developer * */
def userDefinedStatusCode(result: Option[JsValue]): Option[Int] = {
val statusCode = JsHelpers
.getFieldPath(result.get.asJsObject, ERROR_FIELD, "statusCode")
Expand Down Expand Up @@ -394,13 +416,17 @@ object Activation extends DefaultJsonProtocol {

case class Metric(metricName: String, metricValue: Long) extends EventMessageBody {
val typeName = "Metric"

override def serialize = toJson.compactPrint

def toJson = Metric.metricFormat.write(this).asJsObject
}

object Metric extends DefaultJsonProtocol {
val typeName = "Metric"

def parse(msg: String) = Try(metricFormat.read(msg.parseJson))

implicit val metricFormat = jsonFormat(Metric.apply _, "metricName", "metricValue")
}

Expand All @@ -411,7 +437,7 @@ case class EventMessage(source: String,
userId: UUID,
eventType: String,
timestamp: Long = System.currentTimeMillis())
extends Message {
extends Message {
override def serialize = EventMessage.format.write(this).compactPrint
}

Expand All @@ -434,7 +460,7 @@ case class InvokerResourceMessage(status: String,
inProgressMemory: Long,
tags: Seq[String],
dedicatedNamespaces: Seq[String])
extends Message {
extends Message {

/**
* Serializes message to string. Must be idempotent.
Expand All @@ -444,6 +470,7 @@ case class InvokerResourceMessage(status: String,

object InvokerResourceMessage extends DefaultJsonProtocol {
def parse(msg: String): Try[InvokerResourceMessage] = Try(serdes.read(msg.parseJson))

implicit val serdes =
jsonFormat(
InvokerResourceMessage.apply _,
Expand All @@ -462,23 +489,25 @@ object InvokerResourceMessage extends DefaultJsonProtocol {
*
* [
* ...
* {
* "data": "RunningData",
* "fqn": "whisk.system/elasticsearch/[email protected]",
* "invocationNamespace": "style95",
* "status": "Running",
* "waitingActivation": 1
* },
* {
* "data": "RunningData",
* "fqn": "whisk.system/elasticsearch/[email protected]",
* "invocationNamespace": "style95",
* "status": "Running",
* "waitingActivation": 1
* },
* ...
* ]
*/
object StatusQuery

case class StatusData(invocationNamespace: String, fqn: String, waitingActivation: Int, status: String, data: String)
extends Message {
extends Message {

override def serialize: String = StatusData.serdes.write(this).compactPrint

}

object StatusData extends DefaultJsonProtocol {

implicit val serdes =
Expand All @@ -495,9 +524,10 @@ case class ContainerCreationMessage(override val transid: TransactionId,
rpcPort: Int,
retryCount: Int = 0,
creationId: CreationId = CreationId.generate())
extends ContainerMessage(transid) {
extends ContainerMessage(transid) {

override def toJson: JsValue = ContainerCreationMessage.serdes.write(this)

override def serialize: String = toJson.compactPrint
}

Expand Down Expand Up @@ -526,8 +556,9 @@ case class ContainerDeletionMessage(override val transid: TransactionId,
action: FullyQualifiedEntityName,
revision: DocRevision,
whiskActionMetaData: WhiskActionMetaData)
extends ContainerMessage(transid) {
extends ContainerMessage(transid) {
override def toJson: JsValue = ContainerDeletionMessage.serdes.write(this)

override def serialize: String = toJson.compactPrint
}

Expand All @@ -544,6 +575,7 @@ object ContainerDeletionMessage extends DefaultJsonProtocol {

abstract class ContainerMessage(private val tid: TransactionId) extends Message {
override val transid: TransactionId = tid

override def serialize: String = ContainerMessage.serdes.write(this).compactPrint

/** Serializes the message to JSON. */
Expand All @@ -569,18 +601,31 @@ object ContainerMessage extends DefaultJsonProtocol {
}

sealed trait ContainerCreationError

object ContainerCreationError extends Enumeration {

case object NoAvailableInvokersError extends ContainerCreationError

case object NoAvailableResourceInvokersError extends ContainerCreationError

case object ResourceNotEnoughError extends ContainerCreationError

case object WhiskError extends ContainerCreationError

case object UnknownError extends ContainerCreationError

case object TimeoutError extends ContainerCreationError

case object ShuttingDownError extends ContainerCreationError

case object NonExecutableActionError extends ContainerCreationError

case object DBFetchError extends ContainerCreationError

case object BlackBoxError extends ContainerCreationError

case object ZeroNamespaceLimit extends ContainerCreationError

case object TooManyConcurrentRequests extends ContainerCreationError

val whiskErrors: Set[ContainerCreationError] =
Expand All @@ -594,26 +639,27 @@ object ContainerCreationError extends Enumeration {
TimeoutError,
ZeroNamespaceLimit)

def fromName(name: String) = name.toUpperCase match {
case "NOAVAILABLEINVOKERSERROR" => NoAvailableInvokersError
private def parse(name: String) = name.toUpperCase match {
case "NOAVAILABLEINVOKERSERROR" => NoAvailableInvokersError
case "NOAVAILABLERESOURCEINVOKERSERROR" => NoAvailableResourceInvokersError
case "RESOURCENOTENOUGHERROR" => ResourceNotEnoughError
case "NONEXECUTBLEACTIONERROR" => NonExecutableActionError
case "DBFETCHERROR" => DBFetchError
case "WHISKERROR" => WhiskError
case "BLACKBOXERROR" => BlackBoxError
case "TIMEOUTERROR" => TimeoutError
case "ZERONAMESPACELIMIT" => ZeroNamespaceLimit
case "TOOMANYCONCURRENTREQUESTS" => TooManyConcurrentRequests
case "UNKNOWNERROR" => UnknownError
case "RESOURCENOTENOUGHERROR" => ResourceNotEnoughError
case "NONEXECUTBLEACTIONERROR" => NonExecutableActionError
case "DBFETCHERROR" => DBFetchError
case "WHISKERROR" => WhiskError
case "BLACKBOXERROR" => BlackBoxError
case "TIMEOUTERROR" => TimeoutError
case "ZERONAMESPACELIMIT" => ZeroNamespaceLimit
case "TOOMANYCONCURRENTREQUESTS" => TooManyConcurrentRequests
case "UNKNOWNERROR" => UnknownError
}

implicit val serds = new RootJsonFormat[ContainerCreationError] {
override def write(error: ContainerCreationError): JsValue = JsString(error.toString)

override def read(json: JsValue): ContainerCreationError =
Try {
val JsString(str) = json
ContainerCreationError.fromName(str.trim.toUpperCase)
ContainerCreationError.parse(str.trim.toUpperCase)
} getOrElse {
throw deserializationError("ContainerCreationError must be a valid string")
}
Expand All @@ -632,7 +678,7 @@ case class ContainerCreationAckMessage(override val transid: TransactionId,
retryCount: Int = 0,
error: Option[ContainerCreationError] = None,
reason: Option[String] = None)
extends Message {
extends Message {

/**
* Serializes message to string. Must be idempotent.
Expand All @@ -642,6 +688,7 @@ case class ContainerCreationAckMessage(override val transid: TransactionId,

object ContainerCreationAckMessage extends DefaultJsonProtocol {
def parse(msg: String): Try[ContainerCreationAckMessage] = Try(serdes.read(msg.parseJson))

private implicit val fqnSerdes = FullyQualifiedEntityName.serdes
private implicit val byteSizeSerdes = size.serdes
implicit val serdes = jsonFormat12(ContainerCreationAckMessage.apply)
Expand Down
Loading