Skip to content

[New Scheduler] Add ActivationService #5070

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 7 commits into from
Apr 13, 2021
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import spray.json.JsString
import spray.json.JsValue
import spray.json.RootJsonFormat
import spray.json.deserializationError
import spray.json._

import org.apache.openwhisk.core.entity.ArgNormalizer.trim

Expand Down Expand Up @@ -59,6 +60,7 @@ protected[core] class DocRevision private (val rev: String) extends AnyVal {
def asString = rev // to make explicit that this is a string conversion
def empty = rev == null
override def toString = rev
def serialize = DocRevision.serdes.write(this).compactPrint
}

/**
Expand Down Expand Up @@ -131,6 +133,8 @@ protected[core] object DocRevision {

protected[core] val empty: DocRevision = new DocRevision(null)

protected[core] def parse(msg: String) = Try(serdes.read(msg.parseJson))

implicit val serdes = new RootJsonFormat[DocRevision] {
def write(d: DocRevision) = if (d.rev != null) JsString(d.rev) else JsNull

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@ protected[core] case class FullyQualifiedEntityName(path: EntityPath,
def namespace: EntityName = path.root
def qualifiedNameWithLeadingSlash: String = EntityPath.PATHSEP + qualifiedName
def asString = path.addPath(name) + version.map("@" + _.toString).getOrElse("")
def serialize = FullyQualifiedEntityName.serdes.write(this).compactPrint

override def size = qualifiedName.sizeInBytes
override def toString = asString
Expand Down Expand Up @@ -101,6 +102,8 @@ protected[core] object FullyQualifiedEntityName extends DefaultJsonProtocol {
}
}

protected[core] def parse(msg: String) = Try(serdes.read(msg.parseJson))

/**
* Converts the name to a fully qualified name.
* There are 3 cases:
Expand Down
29 changes: 29 additions & 0 deletions core/scheduler/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ apply plugin: 'application'
apply plugin: 'eclipse'
apply plugin: 'maven'
apply plugin: 'org.scoverage'
apply plugin: 'com.lightbend.akka.grpc.gradle'

ext.dockerImageName = 'scheduler'
apply from: '../../gradle/docker.gradle'
Expand All @@ -33,6 +34,20 @@ ext.coverageDirs = [
]
distDockerCoverage.dependsOn ':common:scala:scoverageClasses', 'scoverageClasses'

buildscript {
repositories {
mavenLocal()
maven {
url "https://plugins.gradle.org/m2/"
}
}
dependencies {
// see https://plugins.gradle.org/plugin/com.lightbend.akka.grpc.gradle
// for the currently latest version.
classpath 'gradle.plugin.com.lightbend.akka.grpc:akka-grpc-gradle-plugin:0.7.2'
}
}

// Define a separate configuration for managing the dependency on Jetty ALPN agent.
configurations {
alpnagent
Expand All @@ -51,7 +66,21 @@ dependencies {

compile "org.scala-lang:scala-library:${gradle.scala.version}"
compile project(':common:scala')
}

// workaround for akka-grpc
// https://github.com/akka/akka-grpc/issues/786
printProtocLogs.doFirst {
mkdir "$buildDir"
file("$buildDir/akka-grpc-gradle-plugin.log").text = "x"
mkdir "$project.rootDir/build"
file("$project.rootDir/build/akka-grpc-gradle-plugin.log").text = "x"
}
printProtocLogs.configure {
mkdir "$buildDir"
file("$buildDir/akka-grpc-gradle-plugin.log").text = "x"
mkdir "$project.rootDir/build"
file("$project.rootDir/build/akka-grpc-gradle-plugin.log").text = "x"
}

mainClassName = "org.apache.openwhisk.core.scheduler.Scheduler"
Expand Down
5 changes: 5 additions & 0 deletions core/scheduler/src/main/java/Empty.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
public class Empty {
// Workaround for this issue https://github.com/akka/akka-grpc/issues/289
// Gradle complains about no java sources.
// Note. Openwhisk is using a lower gradle version, so the latest akka-grpc version cannot be used.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we fix this? The issue has been fixed

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've already tried it, but It requires a higher Gradle version to use the latest akka-grpc.

}
52 changes: 52 additions & 0 deletions core/scheduler/src/main/protobuf/activation.proto
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
syntax = "proto3";
import "google/protobuf/wrappers.proto";

//#options
option java_multiple_files = true;
option java_package = "org.apache.openwhisk.grpc";
option java_outer_classname = "ActivationProto";

package activation;
//#options

//#services
service ActivationService {

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: remove extra lines

rpc FetchActivation (FetchRequest) returns (FetchResponse) {}

rpc RescheduleActivation (RescheduleRequest) returns (RescheduleResponse) {}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For the design document, RescheduleActivation rpc is for rescheduling when the container proxy cannot process messages, can give some example here? i think just describe it is enough.


}
//#services

//#messages
// The request message
message FetchRequest {
string invocationNamespace = 1;
string fqn = 2;
string rev = 3;
string containerId = 4;
bool warmed = 5;
// This allows optional value
google.protobuf.Int64Value lastDuration = 6;
// to record alive containers
bool alive = 7;
}

// The response message
message FetchResponse {
string activationMessage = 1;
}

message RescheduleRequest {
string invocationNamespace = 1;
string fqn = 2;
string rev = 3;
string activationMessage = 4;
}

message RescheduleResponse {
// if reschedule request is failed, then it will be `false`
bool isRescheduled = 1;
}

Original file line number Diff line number Diff line change
@@ -0,0 +1,118 @@
package org.apache.openwhisk.core.scheduler.grpc

import akka.actor.ActorSystem
import akka.pattern.ask
import akka.util.Timeout
import org.apache.openwhisk.common.Logging
import org.apache.openwhisk.core.connector.{ActivationMessage, Message}
import org.apache.openwhisk.core.entity.{DocRevision, FullyQualifiedEntityName}
import org.apache.openwhisk.core.scheduler.queue._
import org.apache.openwhisk.grpc.{ActivationService, FetchRequest, FetchResponse, RescheduleRequest, RescheduleResponse}
import spray.json._

import scala.concurrent.duration._
import scala.concurrent.{ExecutionContextExecutor, Future}
import scala.util.Try

class ActivationServiceImpl()(implicit actorSystem: ActorSystem, logging: Logging) extends ActivationService {
implicit val requestTimeout: Timeout = Timeout(50.seconds)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should this not be hardcoded?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should this not be hardcoded?

implicit val ec: ExecutionContextExecutor = actorSystem.dispatcher

override def rescheduleActivation(request: RescheduleRequest): Future[RescheduleResponse] = {
logging.info(this, s"Try to reschedule activation ${request.invocationNamespace} ${request.fqn} ${request.rev}")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this should be debug statement

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Rescheduling is a special case that happens very occasionally. I think we can keep this log with the info level.

Future(for {
fqn <- FullyQualifiedEntityName.parse(request.fqn)
rev <- DocRevision.parse(request.rev)
msg <- ActivationMessage.parse(request.activationMessage)
} yield (fqn, rev, msg)).flatMap(Future.fromTry) flatMap { res =>
{
val key = res._1.toDocId.asDocInfo(res._2)
QueuePool.get(MemoryQueueKey(request.invocationNamespace, key)) match {
case Some(queueValue) =>
// enqueue activation message to reschedule
logging.info(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should this be debug

this,
s"Enqueue activation message to reschedule ${request.invocationNamespace} ${request.fqn} ${request.rev}")
queueValue.queue ? res._3
Future.successful(RescheduleResponse(isRescheduled = true))
case None =>
logging.error(this, s"Queue not found for ${request.invocationNamespace} ${request.fqn} ${request.rev}")
Future.successful(RescheduleResponse())
}
}
}
}

override def fetchActivation(request: FetchRequest): Future[FetchResponse] = {
Future(for {
fqn <- FullyQualifiedEntityName.parse(request.fqn)
rev <- DocRevision.parse(request.rev)
} yield (fqn, rev)).flatMap(Future.fromTry) flatMap { res =>
val key = res._1.toDocId.asDocInfo(res._2)
QueuePool.get(MemoryQueueKey(request.invocationNamespace, key)) match {
case Some(queueValue) =>
(queueValue.queue ? GetActivation(
res._1,
request.containerId,
request.warmed,
request.lastDuration,
request.alive))
.mapTo[ActivationResponse]
.map { response =>
FetchResponse(response.serialize)
}
.recover {
case t: Throwable =>
logging.error(this, s"Failed to get message from QueueManager, error: ${t.getMessage}")
FetchResponse(ActivationResponse(Left(NoActivationMessage())).serialize)
}
case None =>
if (QueuePool.keys.exists { mkey =>
mkey.invocationNamespace == request.invocationNamespace && mkey.docInfo.id == key.id
})
Future.successful(FetchResponse(ActivationResponse(Left(ActionMismatch())).serialize))
else
Future.successful(FetchResponse(ActivationResponse(Left(NoMemoryQueue())).serialize))
}
}
}
}

object ActivationServiceImpl {

def apply()(implicit actorSystem: ActorSystem, logging: Logging) =
new ActivationServiceImpl()
}

case class GetActivation(action: FullyQualifiedEntityName,
containerId: String,
warmed: Boolean,
lastDuration: Option[Long],
alive: Boolean = true)
case class ActivationResponse(message: Either[MemoryQueueError, ActivationMessage]) extends Message {
override def serialize = ActivationResponse.serdes.write(this).compactPrint
}

object ActivationResponse extends DefaultJsonProtocol {

private implicit val noMessageSerdes = NoActivationMessage.serdes
private implicit val noQueueSerdes = NoMemoryQueue.serdes
private implicit val mismatchSerdes = ActionMismatch.serdes
private implicit val messageSerdes = ActivationMessage.serdes
private implicit val memoryqueueuErrorSerdes = MemoryQueueErrorSerdes.memoryQueueErrorFormat

def parse(msg: String) = Try(serdes.read(msg.parseJson))

implicit def rootEitherFormat[A: RootJsonFormat, B: RootJsonFormat] =
new RootJsonFormat[Either[A, B]] {
val format = DefaultJsonProtocol.eitherFormat[A, B]

def write(either: Either[A, B]) = format.write(either)

def read(value: JsValue) = format.read(value)
}

type ActivationResponse = Either[MemoryQueueError, ActivationMessage]
implicit val serdes = jsonFormat(ActivationResponse.apply _, "message")

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,102 @@
package org.apache.openwhisk.core.scheduler.queue

import akka.actor.ActorRef
import org.apache.openwhisk.core.connector._
import org.apache.openwhisk.core.entity._
import spray.json.{DefaultJsonProtocol, _}
import scala.collection.concurrent.TrieMap
import scala.util.Try

object QueueSize
case class MemoryQueueKey(invocationNamespace: String, docInfo: DocInfo)
case class MemoryQueueValue(queue: ActorRef, isLeader: Boolean)

sealed trait MemoryQueueError extends Product {
val causedBy: String
}

object MemoryQueueErrorSerdes {

private implicit val noMessageSerdes = NoActivationMessage.serdes
private implicit val noQueueSerdes = NoMemoryQueue.serdes
private implicit val mismatchSerdes = ActionMismatch.serdes

// format that discriminates based on an additional
// field "type" that can either be "Cat" or "Dog"
implicit val memoryQueueErrorFormat = new RootJsonFormat[MemoryQueueError] {
def write(obj: MemoryQueueError): JsValue =
JsObject((obj match {
case msg: NoActivationMessage => msg.toJson
case msg: NoMemoryQueue => msg.toJson
case msg: ActionMismatch => msg.toJson
}).asJsObject.fields + ("type" -> JsString(obj.productPrefix)))

def read(json: JsValue): MemoryQueueError =
json.asJsObject.getFields("type") match {
case Seq(JsString("NoActivationMessage")) => json.convertTo[NoActivationMessage]
case Seq(JsString("NoMemoryQueue")) => json.convertTo[NoMemoryQueue]
case Seq(JsString("ActionMismatch")) => json.convertTo[ActionMismatch]
}
}
}

case class NoActivationMessage(noActivationMessage: String = NoActivationMessage.asString)
extends MemoryQueueError
with Message {
override val causedBy: String = noActivationMessage
override def serialize = NoActivationMessage.serdes.write(this).compactPrint
}

object NoActivationMessage extends DefaultJsonProtocol {
val asString: String = "no activation message exist"
def parse(msg: String) = Try(serdes.read(msg.parseJson))
implicit val serdes = jsonFormat(NoActivationMessage.apply _, "noActivationMessage")
}

case class NoMemoryQueue(noMemoryQueue: String = NoMemoryQueue.asString) extends MemoryQueueError with Message {
override val causedBy: String = noMemoryQueue
override def serialize = NoMemoryQueue.serdes.write(this).compactPrint
}

object NoMemoryQueue extends DefaultJsonProtocol {
val asString: String = "no memory queue exist"
def parse(msg: String) = Try(serdes.read(msg.parseJson))
implicit val serdes = jsonFormat(NoMemoryQueue.apply _, "noMemoryQueue")
}

case class ActionMismatch(actionMisMatch: String = ActionMismatch.asString) extends MemoryQueueError with Message {
override val causedBy: String = actionMisMatch
override def serialize = ActionMismatch.serdes.write(this).compactPrint
}

object ActionMismatch extends DefaultJsonProtocol {
val asString: String = "action version does not match"
def parse(msg: String) = Try(serdes.read(msg.parseJson))
implicit val serdes = jsonFormat(ActionMismatch.apply _, "actionMisMatch")
}

object QueuePool {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this safe as a global static object being accessed from futures?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The only field _queuePool(TrieMap[MemoryQueueKey, MemoryQueueValue]) which is included in this object is a thread-safe.

private val _queuePool = TrieMap[MemoryQueueKey, MemoryQueueValue]()

private[scheduler] def get(key: MemoryQueueKey) = _queuePool.get(key)

private[scheduler] def put(key: MemoryQueueKey, value: MemoryQueueValue) = _queuePool.put(key, value)

private[scheduler] def remove(key: MemoryQueueKey) = _queuePool.remove(key)

private[scheduler] def countLeader() = _queuePool.count(_._2.isLeader)

private[scheduler] def clear(): Unit = _queuePool.clear()

private[scheduler] def size = _queuePool.size

private[scheduler] def values = _queuePool.values

private[scheduler] def keys = _queuePool.keys
}

case class CreateQueue(invocationNamespace: String,
fqn: FullyQualifiedEntityName,
revision: DocRevision,
whiskActionMetaData: WhiskActionMetaData)
case class CreateQueueResponse(invocationNamespace: String, fqn: FullyQualifiedEntityName, success: Boolean)
Loading