Skip to content

Commit 19f2ef9

Browse files
committed
Add container message consumer
1 parent aa7e6e2 commit 19f2ef9

File tree

5 files changed

+528
-1
lines changed

5 files changed

+528
-1
lines changed

common/scala/src/main/scala/org/apache/openwhisk/core/connector/Message.scala

Lines changed: 163 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -484,3 +484,166 @@ object StatusData extends DefaultJsonProtocol {
484484
implicit val serdes =
485485
jsonFormat(StatusData.apply _, "invocationNamespace", "fqn", "waitingActivation", "status", "data")
486486
}
487+
488+
489+
case class ContainerCreationMessage(override val transid: TransactionId,
490+
invocationNamespace: String,
491+
action: FullyQualifiedEntityName,
492+
revision: DocRevision,
493+
whiskActionMetaData: WhiskActionMetaData,
494+
rootSchedulerIndex: SchedulerInstanceId,
495+
schedulerHost: String,
496+
rpcPort: Int,
497+
retryCount: Int = 0,
498+
creationId: CreationId = CreationId.generate())
499+
extends ContainerMessage(transid) {
500+
501+
override def toJson: JsValue = ContainerCreationMessage.serdes.write(this)
502+
override def serialize: String = toJson.compactPrint
503+
}
504+
505+
object ContainerCreationMessage extends DefaultJsonProtocol {
506+
def parse(msg: String): Try[ContainerCreationMessage] = Try(serdes.read(msg.parseJson))
507+
508+
private implicit val fqnSerdes = FullyQualifiedEntityName.serdes
509+
private implicit val instanceIdSerdes = SchedulerInstanceId.serdes
510+
private implicit val byteSizeSerdes = size.serdes
511+
implicit val serdes = jsonFormat10(
512+
ContainerCreationMessage.apply(
513+
_: TransactionId,
514+
_: String,
515+
_: FullyQualifiedEntityName,
516+
_: DocRevision,
517+
_: WhiskActionMetaData,
518+
_: SchedulerInstanceId,
519+
_: String,
520+
_: Int,
521+
_: Int,
522+
_: CreationId))
523+
}
524+
525+
case class ContainerDeletionMessage(override val transid: TransactionId,
526+
invocationNamespace: String,
527+
action: FullyQualifiedEntityName,
528+
revision: DocRevision,
529+
whiskActionMetaData: WhiskActionMetaData)
530+
extends ContainerMessage(transid) {
531+
override def toJson: JsValue = ContainerDeletionMessage.serdes.write(this)
532+
override def serialize: String = toJson.compactPrint
533+
}
534+
535+
object ContainerDeletionMessage extends DefaultJsonProtocol {
536+
def parse(msg: String): Try[ContainerDeletionMessage] = Try(serdes.read(msg.parseJson))
537+
538+
private implicit val fqnSerdes = FullyQualifiedEntityName.serdes
539+
private implicit val instanceIdSerdes = SchedulerInstanceId.serdes
540+
private implicit val byteSizeSerdes = size.serdes
541+
implicit val serdes = jsonFormat5(
542+
ContainerDeletionMessage
543+
.apply(_: TransactionId, _: String, _: FullyQualifiedEntityName, _: DocRevision, _: WhiskActionMetaData))
544+
}
545+
546+
abstract class ContainerMessage(private val tid: TransactionId) extends Message {
547+
override val transid: TransactionId = tid
548+
override def serialize: String = ContainerMessage.serdes.write(this).compactPrint
549+
550+
/** Serializes the message to JSON. */
551+
def toJson: JsValue
552+
}
553+
554+
object ContainerMessage extends DefaultJsonProtocol {
555+
def parse(msg: String): Try[ContainerMessage] = Try(serdes.read(msg.parseJson))
556+
557+
implicit val serdes = new RootJsonFormat[ContainerMessage] {
558+
override def write(m: ContainerMessage): JsValue = m.toJson
559+
560+
override def read(json: JsValue): ContainerMessage = {
561+
val JsObject(fields) = json
562+
val creation = fields.contains("creationId")
563+
if (creation) {
564+
json.convertTo[ContainerCreationMessage]
565+
} else {
566+
json.convertTo[ContainerDeletionMessage]
567+
}
568+
}
569+
}
570+
}
571+
572+
sealed trait ContainerCreationError
573+
object ContainerCreationError extends Enumeration {
574+
case object NoAvailableInvokersError extends ContainerCreationError
575+
case object NoAvailableResourceInvokersError extends ContainerCreationError
576+
case object ResourceNotEnoughError extends ContainerCreationError
577+
case object WhiskError extends ContainerCreationError
578+
case object UnknownError extends ContainerCreationError
579+
case object TimeoutError extends ContainerCreationError
580+
case object ShuttingDownError extends ContainerCreationError
581+
case object NonExecutableActionError extends ContainerCreationError
582+
case object DBFetchError extends ContainerCreationError
583+
case object BlackBoxError extends ContainerCreationError
584+
case object ZeroNamespaceLimit extends ContainerCreationError
585+
case object TooManyConcurrentRequests extends ContainerCreationError
586+
587+
val whiskErrors: Set[ContainerCreationError] =
588+
Set(
589+
NoAvailableInvokersError,
590+
NoAvailableResourceInvokersError,
591+
ResourceNotEnoughError,
592+
WhiskError,
593+
ShuttingDownError,
594+
UnknownError,
595+
TimeoutError,
596+
ZeroNamespaceLimit)
597+
598+
private def parse(name: String) = name.toUpperCase match {
599+
case "NOAVAILABLEINVOKERSERROR" => NoAvailableInvokersError
600+
case "NOAVAILABLERESOURCEINVOKERSERROR" => NoAvailableResourceInvokersError
601+
case "RESOURCENOTENOUGHERROR" => ResourceNotEnoughError
602+
case "NONEXECUTBLEACTIONERROR" => NonExecutableActionError
603+
case "DBFETCHERROR" => DBFetchError
604+
case "WHISKERROR" => WhiskError
605+
case "BLACKBOXERROR" => BlackBoxError
606+
case "TIMEOUTERROR" => TimeoutError
607+
case "ZERONAMESPACELIMIT" => ZeroNamespaceLimit
608+
case "TOOMANYCONCURRENTREQUESTS" => TooManyConcurrentRequests
609+
case "UNKNOWNERROR" => UnknownError
610+
}
611+
612+
implicit val serds = new RootJsonFormat[ContainerCreationError] {
613+
override def write(error: ContainerCreationError): JsValue = JsString(error.toString)
614+
override def read(json: JsValue): ContainerCreationError =
615+
Try {
616+
val JsString(str) = json
617+
ContainerCreationError.parse(str.trim.toUpperCase)
618+
} getOrElse {
619+
throw deserializationError("ContainerCreationError must be a valid string")
620+
}
621+
}
622+
}
623+
624+
case class ContainerCreationAckMessage(override val transid: TransactionId,
625+
creationId: CreationId,
626+
invocationNamespace: String,
627+
action: FullyQualifiedEntityName,
628+
revision: DocRevision,
629+
actionMetaData: WhiskActionMetaData,
630+
rootInvokerIndex: InvokerInstanceId,
631+
schedulerHost: String,
632+
rpcPort: Int,
633+
retryCount: Int = 0,
634+
error: Option[ContainerCreationError] = None,
635+
reason: Option[String] = None)
636+
extends Message {
637+
638+
/**
639+
* Serializes message to string. Must be idempotent.
640+
*/
641+
override def serialize: String = ContainerCreationAckMessage.serdes.write(this).compactPrint
642+
}
643+
644+
object ContainerCreationAckMessage extends DefaultJsonProtocol {
645+
def parse(msg: String): Try[ContainerCreationAckMessage] = Try(serdes.read(msg.parseJson))
646+
private implicit val fqnSerdes = FullyQualifiedEntityName.serdes
647+
private implicit val byteSizeSerdes = size.serdes
648+
implicit val serdes = jsonFormat12(ContainerCreationAckMessage.apply)
649+
}

common/scala/src/main/scala/org/apache/openwhisk/core/entity/Size.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -163,7 +163,7 @@ object size {
163163
implicit val pureconfigReader =
164164
ConfigReader[ConfigValue].map(v => ByteSize(v.atKey("key").getBytes("key"), SizeUnits.BYTE))
165165

166-
protected[entity] implicit val serdes = new RootJsonFormat[ByteSize] {
166+
protected[core] implicit val serdes = new RootJsonFormat[ByteSize] {
167167
def write(b: ByteSize) = JsString(b.toString)
168168

169169
def read(value: JsValue): ByteSize = value match {
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,27 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.openwhisk.core.containerpool.v2
19+
20+
import org.apache.openwhisk.core.connector.{
21+
ContainerCreationMessage,
22+
ContainerDeletionMessage
23+
}
24+
import org.apache.openwhisk.core.entity._
25+
26+
case class CreateContainer(creationMessage: ContainerCreationMessage, action: WhiskAction)
27+
case class DeleteContainer(deletionMessage: ContainerDeletionMessage)
Lines changed: 121 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,121 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.openwhisk.core.invoker
19+
20+
import java.nio.charset.StandardCharsets
21+
22+
import akka.actor.{ActorRef, ActorSystem, Props}
23+
import org.apache.kafka.clients.producer.RecordMetadata
24+
import org.apache.openwhisk.common.{GracefulShutdown, Logging, TransactionId}
25+
import org.apache.openwhisk.core.WhiskConfig
26+
import org.apache.openwhisk.core.connector.ContainerCreationError.DBFetchError
27+
import org.apache.openwhisk.core.connector._
28+
import org.apache.openwhisk.core.containerpool.v2.{CreateContainer, DeleteContainer}
29+
import org.apache.openwhisk.core.database.{ArtifactStore, DocumentTypeMismatchException, DocumentUnreadable, NoDocumentException}
30+
import org.apache.openwhisk.core.entity._
31+
import org.apache.openwhisk.http.Messages
32+
33+
import scala.concurrent.duration._
34+
import scala.concurrent.{ExecutionContext, Future}
35+
import scala.util.{Failure, Success}
36+
37+
class ContainerMessageConsumer(
38+
invokerInstanceId: InvokerInstanceId,
39+
containerPool: ActorRef,
40+
entityStore: ArtifactStore[WhiskEntity],
41+
config: WhiskConfig,
42+
msgProvider: MessagingProvider,
43+
longPollDuration: FiniteDuration,
44+
maxPeek: Int,
45+
sendAckToScheduler: (SchedulerInstanceId, ContainerCreationAckMessage) => Future[RecordMetadata])(
46+
implicit actorSystem: ActorSystem,
47+
executionContext: ExecutionContext,
48+
logging: Logging) {
49+
50+
private val topic = s"${Invoker.topicPrefix}invoker${invokerInstanceId.toInt}"
51+
private val consumer =
52+
msgProvider.getConsumer(config, topic, topic, maxPeek, maxPollInterval = TimeLimit.MAX_DURATION + 1.minute)
53+
54+
private def handler(bytes: Array[Byte]): Future[Unit] = Future {
55+
val raw = new String(bytes, StandardCharsets.UTF_8)
56+
ContainerMessage.parse(raw) match {
57+
case Success(creation: ContainerCreationMessage) =>
58+
implicit val transid: TransactionId = creation.transid
59+
logging.debug(
60+
this,
61+
s"creation message: ${creation.creationId} for ${creation.invocationNamespace}/${creation.action} is received")
62+
WhiskAction
63+
.get(entityStore, creation.action.toDocId, creation.revision, fromCache = true)
64+
.map { action =>
65+
containerPool ! CreateContainer(creation, action)
66+
feed ! MessageFeed.Processed
67+
}
68+
.recover {
69+
case t =>
70+
val message = t match {
71+
case _: NoDocumentException =>
72+
Messages.actionRemovedWhileInvoking
73+
case _: DocumentTypeMismatchException | _: DocumentUnreadable =>
74+
Messages.actionMismatchWhileInvoking
75+
case e: Throwable =>
76+
logging.error(this, s"An unknown DB connection error occurred while fetching an action: $e.")
77+
Messages.actionFetchErrorWhileInvoking
78+
}
79+
logging.error(
80+
this,
81+
s"creationId: ${creation.creationId}, failed to fetch action ${creation.invocationNamespace}/${creation.action}, error: $message")
82+
83+
val ack = ContainerCreationAckMessage(
84+
creation.transid,
85+
creation.creationId,
86+
creation.invocationNamespace,
87+
creation.action,
88+
creation.revision,
89+
creation.whiskActionMetaData,
90+
invokerInstanceId,
91+
creation.schedulerHost,
92+
creation.rpcPort,
93+
creation.retryCount,
94+
Some(DBFetchError),
95+
Some(message))
96+
sendAckToScheduler(creation.rootSchedulerIndex, ack)
97+
feed ! MessageFeed.Processed
98+
}
99+
case Success(deletion: ContainerDeletionMessage) =>
100+
implicit val transid: TransactionId = deletion.transid
101+
logging.info(this, s"deletion message for ${deletion.invocationNamespace}/${deletion.action} is received")
102+
containerPool ! DeleteContainer(deletion)
103+
feed ! MessageFeed.Processed
104+
case Failure(t) =>
105+
logging.error(this, s"Failed to parse $bytes, error: ${t.getMessage}")
106+
feed ! MessageFeed.Processed
107+
108+
case _ =>
109+
logging.error(this, s"Unexpected Container Message received by InvokerReactive: $raw")
110+
feed ! MessageFeed.Processed
111+
}
112+
}
113+
114+
private val feed = actorSystem.actorOf(Props {
115+
new MessageFeed("containerCreation", logging, consumer, maxPeek, longPollDuration, handler)
116+
})
117+
118+
def close(): Unit = {
119+
feed ! GracefulShutdown
120+
}
121+
}

0 commit comments

Comments
 (0)