Skip to content

optimize scheduling decision when there are stale activations #5344

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 4 commits into from
Nov 1, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -774,7 +774,6 @@ class FunctionPullingContainerProxy(
case Event(Remove | GracefulShutdown, _) =>
stay()


case Event(DetermineKeepContainer(_), _) =>
stay()
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -135,41 +135,44 @@ class SchedulingDecisionMaker(
staleActivationNum,
0.0,
Running)

case (Running, Some(duration)) if staleActivationNum > 0 =>
// we can safely get the value as we already checked the existence
val containerThroughput = staleThreshold / duration
val num = ceiling(availableMsg.toDouble / containerThroughput)
// if it tries to create more containers than existing messages, we just create shortage
val actualNum = (if (num > availableMsg) availableMsg else num) - inProgress
addServersIfPossible(
existing,
inProgress,
containerThroughput,
availableMsg,
capacity,
actualNum,
staleActivationNum,
duration,
Running)

// need more containers and a message is already processed
case (Running, Some(duration)) =>
// we can safely get the value as we already checked the existence
val containerThroughput = staleThreshold / duration
val expectedTps = containerThroughput * (existing + inProgress)
val availableNonStaleActivations = availableMsg - staleActivationNum

var staleContainerProvision = 0
if (staleActivationNum > 0) {
val num = ceiling(staleActivationNum.toDouble / containerThroughput)
// if it tries to create more containers than existing messages, we just create shortage
staleContainerProvision = (if (num > staleActivationNum) staleActivationNum else num) - inProgress
}

if (availableMsg >= expectedTps && existing + inProgress < availableMsg) {
val num = ceiling((availableMsg / containerThroughput) - existing - inProgress)
if (availableNonStaleActivations >= expectedTps && existing + inProgress < availableNonStaleActivations) {
val num = ceiling((availableNonStaleActivations / containerThroughput) - existing - inProgress)
// if it tries to create more containers than existing messages, we just create shortage
val actualNum = if (num + totalContainers > availableMsg) availableMsg - totalContainers else num
val actualNum =
if (num + totalContainers > availableNonStaleActivations) availableNonStaleActivations - totalContainers
else num
addServersIfPossible(
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we add a unit test to cover this case?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added two test cases to cover 1. when need to add containers both for stale messages and increase tps for non-stale available messages 2. when only need to add container to cover stale messages and tps is still met for non-stale messages.

Would appreciate approval on this one too when you have some time @style95

existing,
inProgress,
containerThroughput,
availableMsg,
capacity,
actualNum + staleContainerProvision,
staleActivationNum,
duration,
Running)
} else if (staleContainerProvision > 0) {
addServersIfPossible(
existing,
inProgress,
containerThroughput,
availableMsg,
capacity,
actualNum,
staleContainerProvision,
staleActivationNum,
duration,
Running)
Expand All @@ -184,9 +187,9 @@ class SchedulingDecisionMaker(
case (Removing, Some(duration)) if staleActivationNum > 0 =>
// we can safely get the value as we already checked the existence
val containerThroughput = staleThreshold / duration
val num = ceiling(availableMsg.toDouble / containerThroughput)
val num = ceiling(staleActivationNum.toDouble / containerThroughput)
// if it tries to create more containers than existing messages, we just create shortage
val actualNum = (if (num > availableMsg) availableMsg else num) - inProgress
val actualNum = (if (num > staleActivationNum) staleActivationNum else num) - inProgress
addServersIfPossible(
existing,
inProgress,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -683,6 +683,54 @@ class SchedulingDecisionMakerTests
testProbe.expectMsg(DecisionResults(AddContainer, 2))
}

it should "add more containers when there are stale messages and non-stale messages and both message classes need more containers" in {
val decisionMaker = system.actorOf(SchedulingDecisionMaker.props(testNamespace, action, schedulingConfig))
val testProbe = TestProbe()

val msg = QueueSnapshot(
initialized = true,
incomingMsgCount = new AtomicInteger(0),
currentMsgCount = 5,
existingContainerCount = 2,
inProgressContainerCount = 0,
staleActivationNum = 2,
existingContainerCountInNamespace = 2,
inProgressContainerCountInNamespace = 0,
averageDuration = Some(1000), // the average duration exists
limit = 10,
stateName = Running,
recipient = testProbe.ref)

decisionMaker ! msg

//should add two for the stale messages and one to increase tps of non-stale available messages
testProbe.expectMsg(DecisionResults(AddContainer, 3))
}

it should "add more containers when there are stale messages and non-stale messages have needed tps" in {
val decisionMaker = system.actorOf(SchedulingDecisionMaker.props(testNamespace, action, schedulingConfig))
val testProbe = TestProbe()

val msg = QueueSnapshot(
initialized = true,
incomingMsgCount = new AtomicInteger(0),
currentMsgCount = 5,
existingContainerCount = 2,
inProgressContainerCount = 0,
staleActivationNum = 2,
existingContainerCountInNamespace = 2,
inProgressContainerCountInNamespace = 0,
averageDuration = Some(50), // the average duration gives container throughput of 2
limit = 10,
stateName = Running,
recipient = testProbe.ref)

decisionMaker ! msg

//should add one additional container for stale messages and non-stale messages still meet tps
testProbe.expectMsg(DecisionResults(AddContainer, 1))
}

it should "enable namespace throttling while adding more container when there are stale messages even in the GracefulShuttingDown" in {
val decisionMaker = system.actorOf(SchedulingDecisionMaker.props(testNamespace, action, schedulingConfig))
val testProbe = TestProbe()
Expand Down