Skip to content

fix bug in average ring buffer and add negative duration protection to scheduler decision maker #5396

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Apr 18, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -32,18 +32,12 @@ object AverageRingBuffer {
class AverageRingBuffer(private val maxSize: Int) {
private var elements = Vector.empty[Double]
private var sum = 0.0
private var max = 0.0
private var min = 0.0

def nonEmpty: Boolean = elements.nonEmpty

def average: Double = {
val size = elements.size
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

any chance size is 0?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the only place this function is called, it does a nonEmpty check first or skips over it. Now debatably that check should also be happening inside this function for sure to prevent the divide by zero; but since it's not an existing issue for the moment I'm going to merge this in to get this fixed.

if (size > 2) {
(sum - max - min) / (size - 2)
} else {
sum / size
}
sum / size
}

def add(el: Double): Unit = synchronized {
Expand All @@ -54,12 +48,6 @@ class AverageRingBuffer(private val maxSize: Int) {
sum += el
elements = elements :+ el
}
if (el > max) {
max = el
}
if (el < min) {
min = el
}
}

def size(): Int = elements.size
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -153,19 +153,24 @@ class SchedulingDecisionMaker(
Running)
// need more containers and a message is already processed
case (Running, Some(duration)) =>
// we can safely get the value as we already checked the existence
val containerThroughput = (staleThreshold / duration) * maxActionConcurrency
// we can safely get the value as we already checked the existence, have extra protection in case duration is somehow negative
val containerThroughput = if (duration <= 0) {
maxActionConcurrency
} else {
(staleThreshold / duration) * maxActionConcurrency
}
val expectedTps = containerThroughput * (existing + inProgress)
val availableNonStaleActivations = availableMsg - staleActivationNum

var staleContainerProvision = 0
if (staleActivationNum > 0) {

val num = ceiling(staleActivationNum.toDouble / containerThroughput)
// if it tries to create more containers than existing messages, we just create shortage
staleContainerProvision = (if (num > staleActivationNum) staleActivationNum else num) - inProgress
}

if (availableNonStaleActivations >= expectedTps && existing + inProgress < availableNonStaleActivations) {
if (availableNonStaleActivations >= expectedTps && existing + inProgress < availableNonStaleActivations && duration > 0) {
val num = ceiling((availableNonStaleActivations / containerThroughput) - existing - inProgress)
// if it tries to create more containers than existing messages, we just create shortage
val actualNum =
Expand Down Expand Up @@ -202,7 +207,11 @@ class SchedulingDecisionMaker(
// this case is for that as a last resort.
case (Removing, Some(duration)) if staleActivationNum > 0 =>
// we can safely get the value as we already checked the existence
val containerThroughput = (staleThreshold / duration) * maxActionConcurrency
val containerThroughput = if (duration <= 0) {
maxActionConcurrency
} else {
(staleThreshold / duration) * maxActionConcurrency
}
val num = ceiling(staleActivationNum.toDouble / containerThroughput)
// if it tries to create more containers than existing messages, we just create shortage
val actualNum = (if (num > staleActivationNum) staleActivationNum else num) - inProgress
Expand Down