Skip to content

Commit 8f3f04d

Browse files
authored
Merge f05bf00 into 4e2dea1
2 parents 4e2dea1 + f05bf00 commit 8f3f04d

File tree

2 files changed

+14
-17
lines changed

2 files changed

+14
-17
lines changed

common/scala/src/main/scala/org/apache/openwhisk/common/AverageRingBuffer.scala

Lines changed: 1 addition & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -32,18 +32,12 @@ object AverageRingBuffer {
3232
class AverageRingBuffer(private val maxSize: Int) {
3333
private var elements = Vector.empty[Double]
3434
private var sum = 0.0
35-
private var max = 0.0
36-
private var min = 0.0
3735

3836
def nonEmpty: Boolean = elements.nonEmpty
3937

4038
def average: Double = {
4139
val size = elements.size
42-
if (size > 2) {
43-
(sum - max - min) / (size - 2)
44-
} else {
45-
sum / size
46-
}
40+
sum / size
4741
}
4842

4943
def add(el: Double): Unit = synchronized {
@@ -54,12 +48,6 @@ class AverageRingBuffer(private val maxSize: Int) {
5448
sum += el
5549
elements = elements :+ el
5650
}
57-
if (el > max) {
58-
max = el
59-
}
60-
if (el < min) {
61-
min = el
62-
}
6351
}
6452

6553
def size(): Int = elements.size

core/scheduler/src/main/scala/org/apache/openwhisk/core/scheduler/queue/SchedulingDecisionMaker.scala

Lines changed: 13 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -153,19 +153,24 @@ class SchedulingDecisionMaker(
153153
Running)
154154
// need more containers and a message is already processed
155155
case (Running, Some(duration)) =>
156-
// we can safely get the value as we already checked the existence
157-
val containerThroughput = (staleThreshold / duration) * maxActionConcurrency
156+
// we can safely get the value as we already checked the existence, have extra protection in case duration is somehow negative
157+
val containerThroughput = if (duration <= 0) {
158+
maxActionConcurrency
159+
} else {
160+
(staleThreshold / duration) * maxActionConcurrency
161+
}
158162
val expectedTps = containerThroughput * (existing + inProgress)
159163
val availableNonStaleActivations = availableMsg - staleActivationNum
160164

161165
var staleContainerProvision = 0
162166
if (staleActivationNum > 0) {
167+
163168
val num = ceiling(staleActivationNum.toDouble / containerThroughput)
164169
// if it tries to create more containers than existing messages, we just create shortage
165170
staleContainerProvision = (if (num > staleActivationNum) staleActivationNum else num) - inProgress
166171
}
167172

168-
if (availableNonStaleActivations >= expectedTps && existing + inProgress < availableNonStaleActivations) {
173+
if (availableNonStaleActivations >= expectedTps && existing + inProgress < availableNonStaleActivations && duration > 0) {
169174
val num = ceiling((availableNonStaleActivations / containerThroughput) - existing - inProgress)
170175
// if it tries to create more containers than existing messages, we just create shortage
171176
val actualNum =
@@ -202,7 +207,11 @@ class SchedulingDecisionMaker(
202207
// this case is for that as a last resort.
203208
case (Removing, Some(duration)) if staleActivationNum > 0 =>
204209
// we can safely get the value as we already checked the existence
205-
val containerThroughput = (staleThreshold / duration) * maxActionConcurrency
210+
val containerThroughput = if (duration <= 0) {
211+
maxActionConcurrency
212+
} else {
213+
(staleThreshold / duration) * maxActionConcurrency
214+
}
206215
val num = ceiling(staleActivationNum.toDouble / containerThroughput)
207216
// if it tries to create more containers than existing messages, we just create shortage
208217
val actualNum = (if (num > staleActivationNum) staleActivationNum else num) - inProgress

0 commit comments

Comments
 (0)