Skip to content

Commit cd6fded

Browse files
authored
[New Scheduler] Add ActivationService (#5070)
* Add ActivationService for scheduler * Add annotation * Add license header * Reformat activation.proto * Reduce request timeout * Add license header * Scan code before compiling the code
1 parent 6e5850f commit cd6fded

File tree

11 files changed

+594
-2
lines changed

11 files changed

+594
-2
lines changed

common/scala/src/main/scala/org/apache/openwhisk/core/entity/DocInfo.scala

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@ import spray.json.JsString
2525
import spray.json.JsValue
2626
import spray.json.RootJsonFormat
2727
import spray.json.deserializationError
28+
import spray.json._
2829

2930
import org.apache.openwhisk.core.entity.ArgNormalizer.trim
3031

@@ -59,6 +60,7 @@ protected[core] class DocRevision private (val rev: String) extends AnyVal {
5960
def asString = rev // to make explicit that this is a string conversion
6061
def empty = rev == null
6162
override def toString = rev
63+
def serialize = DocRevision.serdes.write(this).compactPrint
6264
}
6365

6466
/**
@@ -131,6 +133,8 @@ protected[core] object DocRevision {
131133

132134
protected[core] val empty: DocRevision = new DocRevision(null)
133135

136+
protected[core] def parse(msg: String) = Try(serdes.read(msg.parseJson))
137+
134138
implicit val serdes = new RootJsonFormat[DocRevision] {
135139
def write(d: DocRevision) = if (d.rev != null) JsString(d.rev) else JsNull
136140

common/scala/src/main/scala/org/apache/openwhisk/core/entity/FullyQualifiedEntityName.scala

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -56,6 +56,7 @@ protected[core] case class FullyQualifiedEntityName(path: EntityPath,
5656
def namespace: EntityName = path.root
5757
def qualifiedNameWithLeadingSlash: String = EntityPath.PATHSEP + qualifiedName
5858
def asString = path.addPath(name) + version.map("@" + _.toString).getOrElse("")
59+
def serialize = FullyQualifiedEntityName.serdes.write(this).compactPrint
5960

6061
override def size = qualifiedName.sizeInBytes
6162
override def toString = asString
@@ -101,6 +102,8 @@ protected[core] object FullyQualifiedEntityName extends DefaultJsonProtocol {
101102
}
102103
}
103104

105+
protected[core] def parse(msg: String) = Try(serdes.read(msg.parseJson))
106+
104107
/**
105108
* Converts the name to a fully qualified name.
106109
* There are 3 cases:

core/scheduler/build.gradle

Lines changed: 29 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@ apply plugin: 'application'
2020
apply plugin: 'eclipse'
2121
apply plugin: 'maven'
2222
apply plugin: 'org.scoverage'
23+
apply plugin: 'com.lightbend.akka.grpc.gradle'
2324

2425
ext.dockerImageName = 'scheduler'
2526
apply from: '../../gradle/docker.gradle'
@@ -33,6 +34,20 @@ ext.coverageDirs = [
3334
]
3435
distDockerCoverage.dependsOn ':common:scala:scoverageClasses', 'scoverageClasses'
3536

37+
buildscript {
38+
repositories {
39+
mavenLocal()
40+
maven {
41+
url "https://plugins.gradle.org/m2/"
42+
}
43+
}
44+
dependencies {
45+
// see https://plugins.gradle.org/plugin/com.lightbend.akka.grpc.gradle
46+
// for the currently latest version.
47+
classpath 'gradle.plugin.com.lightbend.akka.grpc:akka-grpc-gradle-plugin:0.7.2'
48+
}
49+
}
50+
3651
// Define a separate configuration for managing the dependency on Jetty ALPN agent.
3752
configurations {
3853
alpnagent
@@ -51,7 +66,21 @@ dependencies {
5166

5267
compile "org.scala-lang:scala-library:${gradle.scala.version}"
5368
compile project(':common:scala')
69+
}
5470

71+
// workaround for akka-grpc
72+
// https://github.com/akka/akka-grpc/issues/786
73+
printProtocLogs.doFirst {
74+
mkdir "$buildDir"
75+
file("$buildDir/akka-grpc-gradle-plugin.log").text = "x"
76+
mkdir "$project.rootDir/build"
77+
file("$project.rootDir/build/akka-grpc-gradle-plugin.log").text = "x"
78+
}
79+
printProtocLogs.configure {
80+
mkdir "$buildDir"
81+
file("$buildDir/akka-grpc-gradle-plugin.log").text = "x"
82+
mkdir "$project.rootDir/build"
83+
file("$project.rootDir/build/akka-grpc-gradle-plugin.log").text = "x"
5584
}
5685

5786
mainClassName = "org.apache.openwhisk.core.scheduler.Scheduler"
Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,22 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
public class Empty {
19+
// Workaround for this issue https://github.com/akka/akka-grpc/issues/289
20+
// Gradle complains about no java sources.
21+
// Note. Openwhisk is using a lower gradle version, so the latest akka-grpc version cannot be used.
22+
}
Lines changed: 66 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,66 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
syntax = "proto3";
19+
import "google/protobuf/wrappers.proto";
20+
21+
//#options
22+
option java_multiple_files = true;
23+
option java_package = "org.apache.openwhisk.grpc";
24+
option java_outer_classname = "ActivationProto";
25+
26+
package activation;
27+
//#options
28+
29+
//#services
30+
service ActivationService {
31+
rpc FetchActivation (FetchRequest) returns (FetchResponse) {}
32+
rpc RescheduleActivation (RescheduleRequest) returns (RescheduleResponse) {}
33+
}
34+
//#services
35+
36+
//#messages
37+
// The request message
38+
message FetchRequest {
39+
string invocationNamespace = 1;
40+
string fqn = 2;
41+
string rev = 3;
42+
string containerId = 4;
43+
bool warmed = 5;
44+
// This allows optional value
45+
google.protobuf.Int64Value lastDuration = 6;
46+
// to record alive containers
47+
bool alive = 7;
48+
}
49+
50+
// The response message
51+
message FetchResponse {
52+
string activationMessage = 1;
53+
}
54+
55+
message RescheduleRequest {
56+
string invocationNamespace = 1;
57+
string fqn = 2;
58+
string rev = 3;
59+
string activationMessage = 4;
60+
}
61+
62+
message RescheduleResponse {
63+
// if reschedule request is failed, then it will be `false`
64+
bool isRescheduled = 1;
65+
}
66+
Lines changed: 135 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,135 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.openwhisk.core.scheduler.grpc
19+
20+
import akka.actor.ActorSystem
21+
import akka.pattern.ask
22+
import akka.util.Timeout
23+
import org.apache.openwhisk.common.Logging
24+
import org.apache.openwhisk.core.connector.{ActivationMessage, Message}
25+
import org.apache.openwhisk.core.entity.{DocRevision, FullyQualifiedEntityName}
26+
import org.apache.openwhisk.core.scheduler.queue._
27+
import org.apache.openwhisk.grpc.{ActivationService, FetchRequest, FetchResponse, RescheduleRequest, RescheduleResponse}
28+
import spray.json._
29+
30+
import scala.concurrent.duration._
31+
import scala.concurrent.{ExecutionContextExecutor, Future}
32+
import scala.util.Try
33+
34+
class ActivationServiceImpl()(implicit actorSystem: ActorSystem, logging: Logging) extends ActivationService {
35+
implicit val requestTimeout: Timeout = Timeout(5.seconds)
36+
implicit val ec: ExecutionContextExecutor = actorSystem.dispatcher
37+
38+
override def rescheduleActivation(request: RescheduleRequest): Future[RescheduleResponse] = {
39+
logging.info(this, s"Try to reschedule activation ${request.invocationNamespace} ${request.fqn} ${request.rev}")
40+
Future(for {
41+
fqn <- FullyQualifiedEntityName.parse(request.fqn)
42+
rev <- DocRevision.parse(request.rev)
43+
msg <- ActivationMessage.parse(request.activationMessage)
44+
} yield (fqn, rev, msg)).flatMap(Future.fromTry) flatMap { res =>
45+
{
46+
val key = res._1.toDocId.asDocInfo(res._2)
47+
QueuePool.get(MemoryQueueKey(request.invocationNamespace, key)) match {
48+
case Some(queueValue) =>
49+
// enqueue activation message to reschedule
50+
logging.info(
51+
this,
52+
s"Enqueue activation message to reschedule ${request.invocationNamespace} ${request.fqn} ${request.rev}")
53+
queueValue.queue ? res._3
54+
Future.successful(RescheduleResponse(isRescheduled = true))
55+
case None =>
56+
logging.error(this, s"Queue not found for ${request.invocationNamespace} ${request.fqn} ${request.rev}")
57+
Future.successful(RescheduleResponse())
58+
}
59+
}
60+
}
61+
}
62+
63+
override def fetchActivation(request: FetchRequest): Future[FetchResponse] = {
64+
Future(for {
65+
fqn <- FullyQualifiedEntityName.parse(request.fqn)
66+
rev <- DocRevision.parse(request.rev)
67+
} yield (fqn, rev)).flatMap(Future.fromTry) flatMap { res =>
68+
val key = res._1.toDocId.asDocInfo(res._2)
69+
QueuePool.get(MemoryQueueKey(request.invocationNamespace, key)) match {
70+
case Some(queueValue) =>
71+
(queueValue.queue ? GetActivation(
72+
res._1,
73+
request.containerId,
74+
request.warmed,
75+
request.lastDuration,
76+
request.alive))
77+
.mapTo[ActivationResponse]
78+
.map { response =>
79+
FetchResponse(response.serialize)
80+
}
81+
.recover {
82+
case t: Throwable =>
83+
logging.error(this, s"Failed to get message from QueueManager, error: ${t.getMessage}")
84+
FetchResponse(ActivationResponse(Left(NoActivationMessage())).serialize)
85+
}
86+
case None =>
87+
if (QueuePool.keys.exists { mkey =>
88+
mkey.invocationNamespace == request.invocationNamespace && mkey.docInfo.id == key.id
89+
})
90+
Future.successful(FetchResponse(ActivationResponse(Left(ActionMismatch())).serialize))
91+
else
92+
Future.successful(FetchResponse(ActivationResponse(Left(NoMemoryQueue())).serialize))
93+
}
94+
}
95+
}
96+
}
97+
98+
object ActivationServiceImpl {
99+
100+
def apply()(implicit actorSystem: ActorSystem, logging: Logging) =
101+
new ActivationServiceImpl()
102+
}
103+
104+
case class GetActivation(action: FullyQualifiedEntityName,
105+
containerId: String,
106+
warmed: Boolean,
107+
lastDuration: Option[Long],
108+
alive: Boolean = true)
109+
case class ActivationResponse(message: Either[MemoryQueueError, ActivationMessage]) extends Message {
110+
override def serialize = ActivationResponse.serdes.write(this).compactPrint
111+
}
112+
113+
object ActivationResponse extends DefaultJsonProtocol {
114+
115+
private implicit val noMessageSerdes = NoActivationMessage.serdes
116+
private implicit val noQueueSerdes = NoMemoryQueue.serdes
117+
private implicit val mismatchSerdes = ActionMismatch.serdes
118+
private implicit val messageSerdes = ActivationMessage.serdes
119+
private implicit val memoryqueueuErrorSerdes = MemoryQueueErrorSerdes.memoryQueueErrorFormat
120+
121+
def parse(msg: String) = Try(serdes.read(msg.parseJson))
122+
123+
implicit def rootEitherFormat[A: RootJsonFormat, B: RootJsonFormat] =
124+
new RootJsonFormat[Either[A, B]] {
125+
val format = DefaultJsonProtocol.eitherFormat[A, B]
126+
127+
def write(either: Either[A, B]) = format.write(either)
128+
129+
def read(value: JsValue) = format.read(value)
130+
}
131+
132+
type ActivationResponse = Either[MemoryQueueError, ActivationMessage]
133+
implicit val serdes = jsonFormat(ActivationResponse.apply _, "message")
134+
135+
}

0 commit comments

Comments
 (0)