-
Notifications
You must be signed in to change notification settings - Fork 1.2k
[New Scheduler] Add ActivationService #5070
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from 3 commits
1cd9853
31085d3
1941ffb
6ec11c4
83f8313
2a4c69e
4d9c7c6
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,5 @@ | ||
public class Empty { | ||
// Workaround for this issue https://github.com/akka/akka-grpc/issues/289 | ||
// Gradle complains about no java sources. | ||
// Note. Openwhisk is using a lower gradle version, so the latest akka-grpc version cannot be used. | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,52 @@ | ||
syntax = "proto3"; | ||
import "google/protobuf/wrappers.proto"; | ||
|
||
//#options | ||
option java_multiple_files = true; | ||
option java_package = "org.apache.openwhisk.grpc"; | ||
option java_outer_classname = "ActivationProto"; | ||
|
||
package activation; | ||
//#options | ||
|
||
//#services | ||
service ActivationService { | ||
|
||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. nit: remove extra lines |
||
rpc FetchActivation (FetchRequest) returns (FetchResponse) {} | ||
|
||
rpc RescheduleActivation (RescheduleRequest) returns (RescheduleResponse) {} | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. For the design document, |
||
|
||
} | ||
//#services | ||
|
||
//#messages | ||
// The request message | ||
message FetchRequest { | ||
string invocationNamespace = 1; | ||
string fqn = 2; | ||
string rev = 3; | ||
string containerId = 4; | ||
bool warmed = 5; | ||
// This allows optional value | ||
google.protobuf.Int64Value lastDuration = 6; | ||
// to record alive containers | ||
bool alive = 7; | ||
} | ||
|
||
// The response message | ||
message FetchResponse { | ||
string activationMessage = 1; | ||
} | ||
|
||
message RescheduleRequest { | ||
string invocationNamespace = 1; | ||
string fqn = 2; | ||
string rev = 3; | ||
string activationMessage = 4; | ||
} | ||
|
||
message RescheduleResponse { | ||
// if reschedule request is failed, then it will be `false` | ||
bool isRescheduled = 1; | ||
} | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,118 @@ | ||
package org.apache.openwhisk.core.scheduler.grpc | ||
|
||
import akka.actor.ActorSystem | ||
import akka.pattern.ask | ||
import akka.util.Timeout | ||
import org.apache.openwhisk.common.Logging | ||
import org.apache.openwhisk.core.connector.{ActivationMessage, Message} | ||
import org.apache.openwhisk.core.entity.{DocRevision, FullyQualifiedEntityName} | ||
import org.apache.openwhisk.core.scheduler.queue._ | ||
import org.apache.openwhisk.grpc.{ActivationService, FetchRequest, FetchResponse, RescheduleRequest, RescheduleResponse} | ||
import spray.json._ | ||
|
||
import scala.concurrent.duration._ | ||
import scala.concurrent.{ExecutionContextExecutor, Future} | ||
import scala.util.Try | ||
|
||
class ActivationServiceImpl()(implicit actorSystem: ActorSystem, logging: Logging) extends ActivationService { | ||
implicit val requestTimeout: Timeout = Timeout(50.seconds) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. should this not be hardcoded? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. should this not be hardcoded? |
||
implicit val ec: ExecutionContextExecutor = actorSystem.dispatcher | ||
|
||
override def rescheduleActivation(request: RescheduleRequest): Future[RescheduleResponse] = { | ||
logging.info(this, s"Try to reschedule activation ${request.invocationNamespace} ${request.fqn} ${request.rev}") | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I think this should be There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Rescheduling is a special case that happens very occasionally. I think we can keep this log with the |
||
Future(for { | ||
fqn <- FullyQualifiedEntityName.parse(request.fqn) | ||
rev <- DocRevision.parse(request.rev) | ||
msg <- ActivationMessage.parse(request.activationMessage) | ||
} yield (fqn, rev, msg)).flatMap(Future.fromTry) flatMap { res => | ||
{ | ||
val key = res._1.toDocId.asDocInfo(res._2) | ||
QueuePool.get(MemoryQueueKey(request.invocationNamespace, key)) match { | ||
case Some(queueValue) => | ||
// enqueue activation message to reschedule | ||
logging.info( | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Should this be |
||
this, | ||
s"Enqueue activation message to reschedule ${request.invocationNamespace} ${request.fqn} ${request.rev}") | ||
queueValue.queue ? res._3 | ||
Future.successful(RescheduleResponse(isRescheduled = true)) | ||
case None => | ||
logging.error(this, s"Queue not found for ${request.invocationNamespace} ${request.fqn} ${request.rev}") | ||
Future.successful(RescheduleResponse()) | ||
} | ||
} | ||
} | ||
} | ||
|
||
override def fetchActivation(request: FetchRequest): Future[FetchResponse] = { | ||
Future(for { | ||
fqn <- FullyQualifiedEntityName.parse(request.fqn) | ||
rev <- DocRevision.parse(request.rev) | ||
} yield (fqn, rev)).flatMap(Future.fromTry) flatMap { res => | ||
val key = res._1.toDocId.asDocInfo(res._2) | ||
QueuePool.get(MemoryQueueKey(request.invocationNamespace, key)) match { | ||
case Some(queueValue) => | ||
(queueValue.queue ? GetActivation( | ||
res._1, | ||
request.containerId, | ||
request.warmed, | ||
request.lastDuration, | ||
request.alive)) | ||
.mapTo[ActivationResponse] | ||
.map { response => | ||
FetchResponse(response.serialize) | ||
} | ||
.recover { | ||
case t: Throwable => | ||
logging.error(this, s"Failed to get message from QueueManager, error: ${t.getMessage}") | ||
FetchResponse(ActivationResponse(Left(NoActivationMessage())).serialize) | ||
} | ||
case None => | ||
if (QueuePool.keys.exists { mkey => | ||
mkey.invocationNamespace == request.invocationNamespace && mkey.docInfo.id == key.id | ||
}) | ||
Future.successful(FetchResponse(ActivationResponse(Left(ActionMismatch())).serialize)) | ||
else | ||
Future.successful(FetchResponse(ActivationResponse(Left(NoMemoryQueue())).serialize)) | ||
} | ||
} | ||
} | ||
} | ||
|
||
object ActivationServiceImpl { | ||
|
||
def apply()(implicit actorSystem: ActorSystem, logging: Logging) = | ||
new ActivationServiceImpl() | ||
} | ||
|
||
case class GetActivation(action: FullyQualifiedEntityName, | ||
containerId: String, | ||
warmed: Boolean, | ||
lastDuration: Option[Long], | ||
alive: Boolean = true) | ||
case class ActivationResponse(message: Either[MemoryQueueError, ActivationMessage]) extends Message { | ||
override def serialize = ActivationResponse.serdes.write(this).compactPrint | ||
} | ||
|
||
object ActivationResponse extends DefaultJsonProtocol { | ||
|
||
private implicit val noMessageSerdes = NoActivationMessage.serdes | ||
private implicit val noQueueSerdes = NoMemoryQueue.serdes | ||
private implicit val mismatchSerdes = ActionMismatch.serdes | ||
private implicit val messageSerdes = ActivationMessage.serdes | ||
private implicit val memoryqueueuErrorSerdes = MemoryQueueErrorSerdes.memoryQueueErrorFormat | ||
|
||
def parse(msg: String) = Try(serdes.read(msg.parseJson)) | ||
|
||
implicit def rootEitherFormat[A: RootJsonFormat, B: RootJsonFormat] = | ||
new RootJsonFormat[Either[A, B]] { | ||
val format = DefaultJsonProtocol.eitherFormat[A, B] | ||
|
||
def write(either: Either[A, B]) = format.write(either) | ||
|
||
def read(value: JsValue) = format.read(value) | ||
} | ||
|
||
type ActivationResponse = Either[MemoryQueueError, ActivationMessage] | ||
implicit val serdes = jsonFormat(ActivationResponse.apply _, "message") | ||
|
||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,102 @@ | ||
package org.apache.openwhisk.core.scheduler.queue | ||
|
||
import akka.actor.ActorRef | ||
import org.apache.openwhisk.core.connector._ | ||
import org.apache.openwhisk.core.entity._ | ||
import spray.json.{DefaultJsonProtocol, _} | ||
import scala.collection.concurrent.TrieMap | ||
import scala.util.Try | ||
|
||
object QueueSize | ||
case class MemoryQueueKey(invocationNamespace: String, docInfo: DocInfo) | ||
case class MemoryQueueValue(queue: ActorRef, isLeader: Boolean) | ||
|
||
sealed trait MemoryQueueError extends Product { | ||
val causedBy: String | ||
} | ||
|
||
object MemoryQueueErrorSerdes { | ||
|
||
private implicit val noMessageSerdes = NoActivationMessage.serdes | ||
private implicit val noQueueSerdes = NoMemoryQueue.serdes | ||
private implicit val mismatchSerdes = ActionMismatch.serdes | ||
|
||
// format that discriminates based on an additional | ||
// field "type" that can either be "Cat" or "Dog" | ||
implicit val memoryQueueErrorFormat = new RootJsonFormat[MemoryQueueError] { | ||
def write(obj: MemoryQueueError): JsValue = | ||
JsObject((obj match { | ||
case msg: NoActivationMessage => msg.toJson | ||
case msg: NoMemoryQueue => msg.toJson | ||
case msg: ActionMismatch => msg.toJson | ||
}).asJsObject.fields + ("type" -> JsString(obj.productPrefix))) | ||
|
||
def read(json: JsValue): MemoryQueueError = | ||
json.asJsObject.getFields("type") match { | ||
case Seq(JsString("NoActivationMessage")) => json.convertTo[NoActivationMessage] | ||
case Seq(JsString("NoMemoryQueue")) => json.convertTo[NoMemoryQueue] | ||
case Seq(JsString("ActionMismatch")) => json.convertTo[ActionMismatch] | ||
} | ||
} | ||
} | ||
|
||
case class NoActivationMessage(noActivationMessage: String = NoActivationMessage.asString) | ||
extends MemoryQueueError | ||
with Message { | ||
override val causedBy: String = noActivationMessage | ||
override def serialize = NoActivationMessage.serdes.write(this).compactPrint | ||
} | ||
|
||
object NoActivationMessage extends DefaultJsonProtocol { | ||
val asString: String = "no activation message exist" | ||
def parse(msg: String) = Try(serdes.read(msg.parseJson)) | ||
implicit val serdes = jsonFormat(NoActivationMessage.apply _, "noActivationMessage") | ||
} | ||
|
||
case class NoMemoryQueue(noMemoryQueue: String = NoMemoryQueue.asString) extends MemoryQueueError with Message { | ||
override val causedBy: String = noMemoryQueue | ||
override def serialize = NoMemoryQueue.serdes.write(this).compactPrint | ||
} | ||
|
||
object NoMemoryQueue extends DefaultJsonProtocol { | ||
val asString: String = "no memory queue exist" | ||
def parse(msg: String) = Try(serdes.read(msg.parseJson)) | ||
implicit val serdes = jsonFormat(NoMemoryQueue.apply _, "noMemoryQueue") | ||
} | ||
|
||
case class ActionMismatch(actionMisMatch: String = ActionMismatch.asString) extends MemoryQueueError with Message { | ||
override val causedBy: String = actionMisMatch | ||
override def serialize = ActionMismatch.serdes.write(this).compactPrint | ||
} | ||
|
||
object ActionMismatch extends DefaultJsonProtocol { | ||
val asString: String = "action version does not match" | ||
def parse(msg: String) = Try(serdes.read(msg.parseJson)) | ||
implicit val serdes = jsonFormat(ActionMismatch.apply _, "actionMisMatch") | ||
} | ||
|
||
object QueuePool { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Is this safe as a global static object being accessed from futures? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The only field _queuePool(TrieMap[MemoryQueueKey, MemoryQueueValue]) which is included in this object is a thread-safe. |
||
private val _queuePool = TrieMap[MemoryQueueKey, MemoryQueueValue]() | ||
|
||
private[scheduler] def get(key: MemoryQueueKey) = _queuePool.get(key) | ||
|
||
private[scheduler] def put(key: MemoryQueueKey, value: MemoryQueueValue) = _queuePool.put(key, value) | ||
|
||
private[scheduler] def remove(key: MemoryQueueKey) = _queuePool.remove(key) | ||
|
||
private[scheduler] def countLeader() = _queuePool.count(_._2.isLeader) | ||
|
||
private[scheduler] def clear(): Unit = _queuePool.clear() | ||
|
||
private[scheduler] def size = _queuePool.size | ||
|
||
private[scheduler] def values = _queuePool.values | ||
|
||
private[scheduler] def keys = _queuePool.keys | ||
} | ||
|
||
case class CreateQueue(invocationNamespace: String, | ||
fqn: FullyQualifiedEntityName, | ||
revision: DocRevision, | ||
whiskActionMetaData: WhiskActionMetaData) | ||
case class CreateQueueResponse(invocationNamespace: String, fqn: FullyQualifiedEntityName, success: Boolean) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we fix this? The issue has been fixed
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I've already tried it, but It requires a higher Gradle version to use the latest akka-grpc.