Skip to content

Commit cf36299

Browse files
authored
[New Scheduler] Add memory queue for the new scheduler (#5110)
* Add SchedulingDecisionMaker * Add AverageRingBuffer to calculate the average execution time. * Add MemoryQueue * Remove the duplicate comment. * Apply comments * Explicitly export the scala version * Explicitly export the scala version * Use dotted expression. * Revert the scala version env * Add kryo dependency. * Fix import issues. * Fix import issues. * Remove duplicated codes * Update codes according to the new akka version. * Apply review comments. * Fix test case * Change kryo serialization library * Remove kryo * Remove empty newline * Add altoo kryo serialization library * Change the kryo serializer implementation * Fix test cases
1 parent 7e1caaa commit cf36299

File tree

13 files changed

+5896
-5
lines changed

13 files changed

+5896
-5
lines changed
Lines changed: 66 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,66 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.openwhisk.common
19+
20+
object AverageRingBuffer {
21+
def apply(maxSize: Int) = new AverageRingBuffer(maxSize)
22+
}
23+
24+
/**
25+
* This buffer provides the average of the given elements.
26+
* The number of elements are limited and the first element is removed if the maximum size is reached.
27+
* Since it is based on the Vector, its operation takes effectively constant time.
28+
* For more details, please visit https://docs.scala-lang.org/overviews/collections/performance-characteristics.html
29+
*
30+
* @param maxSize the maximum size of the buffer
31+
*/
32+
class AverageRingBuffer(private val maxSize: Int) {
33+
private var elements = Vector.empty[Double]
34+
private var sum = 0.0
35+
private var max = 0.0
36+
private var min = 0.0
37+
38+
def nonEmpty: Boolean = elements.nonEmpty
39+
40+
def average: Double = {
41+
val size = elements.size
42+
if (size > 2) {
43+
(sum - max - min) / (size - 2)
44+
} else {
45+
sum / size
46+
}
47+
}
48+
49+
def add(el: Double): Unit = synchronized {
50+
if (elements.size == maxSize) {
51+
sum = sum + el - elements.head
52+
elements = elements.tail :+ el
53+
} else {
54+
sum += el
55+
elements = elements :+ el
56+
}
57+
if (el > max) {
58+
max = el
59+
}
60+
if (el < min) {
61+
min = el
62+
}
63+
}
64+
65+
def size(): Int = elements.size
66+
}

common/scala/src/main/scala/org/apache/openwhisk/common/Logging.scala

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -366,6 +366,26 @@ object LoggingMarkers {
366366
*
367367
* MetricEmitter.emitCounterMetric(LoggingMarkers.MY_COUNTER(GreenCounter))
368368
*/
369+
def SCHEDULER_NAMESPACE_CONTAINER(namespace: String) =
370+
LogMarkerToken(scheduler, "namespaceContainer", counter, Some(namespace), Map("namespace" -> namespace))(
371+
MeasurementUnit.none)
372+
def SCHEDULER_NAMESPACE_INPROGRESS_CONTAINER(namespace: String) =
373+
LogMarkerToken(scheduler, "namespaceInProgressContainer", counter, Some(namespace), Map("namespace" -> namespace))(
374+
MeasurementUnit.none)
375+
def SCHEDULER_ACTION_CONTAINER(namespace: String, action: String) =
376+
LogMarkerToken(
377+
scheduler,
378+
"actionContainer",
379+
counter,
380+
Some(namespace),
381+
Map("namespace" -> namespace, "action" -> action))(MeasurementUnit.none)
382+
def SCHEDULER_ACTION_INPROGRESS_CONTAINER(namespace: String, action: String) =
383+
LogMarkerToken(
384+
scheduler,
385+
"actionInProgressContainer",
386+
counter,
387+
Some(namespace),
388+
Map("namespace" -> namespace, "action" -> action))(MeasurementUnit.none)
369389

370390
/*
371391
* Controller related markers

common/scala/src/main/scala/org/apache/openwhisk/core/WhiskConfig.scala

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -295,6 +295,7 @@ object ConfigKeys {
295295
val azBlob = "whisk.azure-blob"
296296

297297
val schedulerMaxPeek = "whisk.scheduler.max-peek"
298+
val schedulerQueue = "whisk.scheduler.queue"
298299
val schedulerQueueManager = "whisk.scheduler.queue-manager"
299300
val schedulerInProgressJobRetentionSecond = "whisk.scheduler.in-progress-job-retention"
300301

common/scala/src/main/scala/org/apache/openwhisk/core/entity/CreationId.scala

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -81,4 +81,7 @@ protected[core] object CreationId {
8181
}
8282
}
8383
}
84+
85+
val systemPrefix = "cid_"
86+
val void = CreationId(systemPrefix + "void")
8487
}

common/scala/src/main/scala/org/apache/openwhisk/http/ErrorResponse.scala

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -71,6 +71,7 @@ object Messages {
7171
s"Too many requests in the last minute (count: $count, allowed: $allowed)."
7272

7373
/** Standard message for too many concurrent activation requests within a time window. */
74+
val tooManyConcurrentRequests = s"Too many concurrent requests in flight."
7475
def tooManyConcurrentRequests(count: Int, allowed: Int) =
7576
s"Too many concurrent requests in flight (count: $count, allowed: $allowed)."
7677

@@ -225,6 +226,7 @@ object Messages {
225226
}
226227

227228
val namespacesBlacklisted = "The action was not invoked due to a blacklisted namespace."
229+
val namespaceLimitUnderZero = "The namespace limit is less than or equal to 0."
228230

229231
val actionRemovedWhileInvoking = "Action could not be found or may have been deleted."
230232
val actionMismatchWhileInvoking = "Action version is not compatible and cannot be invoked."

core/scheduler/build.gradle

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -65,6 +65,7 @@ dependencies {
6565
}
6666

6767
compile "org.scala-lang:scala-library:${gradle.scala.version}"
68+
compile "io.altoo:akka-kryo-serialization_${gradle.scala.depVersion}:1.0.0"
6869
compile project(':common:scala')
6970
}
7071

core/scheduler/src/main/resources/application.conf

Lines changed: 47 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -15,18 +15,63 @@
1515
# limitations under the License.
1616
#
1717

18+
19+
akka {
20+
actor {
21+
allow-java-serialization = off
22+
serializers {
23+
kryo = "io.altoo.akka.serialization.kryo.KryoSerializer"
24+
}
25+
serialization-bindings {
26+
"org.apache.openwhisk.core.scheduler.queue.CreateQueue" = kryo
27+
"org.apache.openwhisk.core.scheduler.queue.CreateQueueResponse" = kryo
28+
"org.apache.openwhisk.core.connector.ActivationMessage" = kryo
29+
}
30+
kryo {
31+
idstrategy = "automatic"
32+
classes = [
33+
"org.apache.openwhisk.core.scheduler.queue.CreateQueue",
34+
"org.apache.openwhisk.core.scheduler.queue.CreateQueueResponse",
35+
"org.apache.openwhisk.core.connector.ActivationMessage"
36+
]
37+
}
38+
}
39+
40+
remote.netty.tcp {
41+
send-buffer-size = 3151796b
42+
receive-buffer-size = 3151796b
43+
maximum-frame-size = 3151796b
44+
}
45+
}
46+
1847
whisk {
1948
# tracing configuration
2049
tracing {
2150
component = "Scheduler"
2251
}
2352

2453
fraction {
25-
managed-fraction: 90%
26-
blackbox-fraction: 10%
54+
managed-fraction: 90%
55+
blackbox-fraction: 10%
2756
}
2857

2958
scheduler {
59+
protocol = "http"
60+
username: "scheduler.user"
61+
password: "scheduler.pass"
62+
grpc {
63+
tls = "false"
64+
}
65+
queue {
66+
idle-grace = "20 seconds"
67+
stop-grace = "20 seconds"
68+
flush-grace = "60 seconds"
69+
graceful-shutdown-timeout = "5 seconds"
70+
max-retention-size = "10000"
71+
max-retention-ms = "60000"
72+
throttling-fraction = "0.9"
73+
duration-buffer-size = "10"
74+
}
3075
queue-manager {
3176
max-scheduling-time = "20 seconds"
3277
max-retries-to-get-queue = "13"

0 commit comments

Comments
 (0)