Skip to content

Commit 60406cc

Browse files
bdoyle0182Brendan Doyle
authored andcommitted
optimize scheduling decision when there are stale activations (apache#5344)
* optimize scheduling decision when there are stale activations * further optimization * scalafmt * add new test cases Co-authored-by: Brendan Doyle <[email protected]>
1 parent f85e194 commit 60406cc

File tree

2 files changed

+75
-24
lines changed

2 files changed

+75
-24
lines changed

core/scheduler/src/main/scala/org/apache/openwhisk/core/scheduler/queue/SchedulingDecisionMaker.scala

Lines changed: 27 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -135,41 +135,44 @@ class SchedulingDecisionMaker(
135135
staleActivationNum,
136136
0.0,
137137
Running)
138-
139-
case (Running, Some(duration)) if staleActivationNum > 0 =>
140-
// we can safely get the value as we already checked the existence
141-
val containerThroughput = staleThreshold / duration
142-
val num = ceiling(availableMsg.toDouble / containerThroughput)
143-
// if it tries to create more containers than existing messages, we just create shortage
144-
val actualNum = (if (num > availableMsg) availableMsg else num) - inProgress
145-
addServersIfPossible(
146-
existing,
147-
inProgress,
148-
containerThroughput,
149-
availableMsg,
150-
capacity,
151-
actualNum,
152-
staleActivationNum,
153-
duration,
154-
Running)
155-
156138
// need more containers and a message is already processed
157139
case (Running, Some(duration)) =>
158140
// we can safely get the value as we already checked the existence
159141
val containerThroughput = staleThreshold / duration
160142
val expectedTps = containerThroughput * (existing + inProgress)
143+
val availableNonStaleActivations = availableMsg - staleActivationNum
144+
145+
var staleContainerProvision = 0
146+
if (staleActivationNum > 0) {
147+
val num = ceiling(staleActivationNum.toDouble / containerThroughput)
148+
// if it tries to create more containers than existing messages, we just create shortage
149+
staleContainerProvision = (if (num > staleActivationNum) staleActivationNum else num) - inProgress
150+
}
161151

162-
if (availableMsg >= expectedTps && existing + inProgress < availableMsg) {
163-
val num = ceiling((availableMsg / containerThroughput) - existing - inProgress)
152+
if (availableNonStaleActivations >= expectedTps && existing + inProgress < availableNonStaleActivations) {
153+
val num = ceiling((availableNonStaleActivations / containerThroughput) - existing - inProgress)
164154
// if it tries to create more containers than existing messages, we just create shortage
165-
val actualNum = if (num + totalContainers > availableMsg) availableMsg - totalContainers else num
155+
val actualNum =
156+
if (num + totalContainers > availableNonStaleActivations) availableNonStaleActivations - totalContainers
157+
else num
158+
addServersIfPossible(
159+
existing,
160+
inProgress,
161+
containerThroughput,
162+
availableMsg,
163+
capacity,
164+
actualNum + staleContainerProvision,
165+
staleActivationNum,
166+
duration,
167+
Running)
168+
} else if (staleContainerProvision > 0) {
166169
addServersIfPossible(
167170
existing,
168171
inProgress,
169172
containerThroughput,
170173
availableMsg,
171174
capacity,
172-
actualNum,
175+
staleContainerProvision,
173176
staleActivationNum,
174177
duration,
175178
Running)
@@ -184,9 +187,9 @@ class SchedulingDecisionMaker(
184187
case (Removing, Some(duration)) if staleActivationNum > 0 =>
185188
// we can safely get the value as we already checked the existence
186189
val containerThroughput = staleThreshold / duration
187-
val num = ceiling(availableMsg.toDouble / containerThroughput)
190+
val num = ceiling(staleActivationNum.toDouble / containerThroughput)
188191
// if it tries to create more containers than existing messages, we just create shortage
189-
val actualNum = (if (num > availableMsg) availableMsg else num) - inProgress
192+
val actualNum = (if (num > staleActivationNum) staleActivationNum else num) - inProgress
190193
addServersIfPossible(
191194
existing,
192195
inProgress,

tests/src/test/scala/org/apache/openwhisk/core/scheduler/queue/test/SchedulingDecisionMakerTests.scala

Lines changed: 48 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -683,6 +683,54 @@ class SchedulingDecisionMakerTests
683683
testProbe.expectMsg(DecisionResults(AddContainer, 2))
684684
}
685685

686+
it should "add more containers when there are stale messages and non-stale messages and both message classes need more containers" in {
687+
val decisionMaker = system.actorOf(SchedulingDecisionMaker.props(testNamespace, action, schedulingConfig))
688+
val testProbe = TestProbe()
689+
690+
val msg = QueueSnapshot(
691+
initialized = true,
692+
incomingMsgCount = new AtomicInteger(0),
693+
currentMsgCount = 5,
694+
existingContainerCount = 2,
695+
inProgressContainerCount = 0,
696+
staleActivationNum = 2,
697+
existingContainerCountInNamespace = 2,
698+
inProgressContainerCountInNamespace = 0,
699+
averageDuration = Some(1000), // the average duration exists
700+
limit = 10,
701+
stateName = Running,
702+
recipient = testProbe.ref)
703+
704+
decisionMaker ! msg
705+
706+
//should add two for the stale messages and one to increase tps of non-stale available messages
707+
testProbe.expectMsg(DecisionResults(AddContainer, 3))
708+
}
709+
710+
it should "add more containers when there are stale messages and non-stale messages have needed tps" in {
711+
val decisionMaker = system.actorOf(SchedulingDecisionMaker.props(testNamespace, action, schedulingConfig))
712+
val testProbe = TestProbe()
713+
714+
val msg = QueueSnapshot(
715+
initialized = true,
716+
incomingMsgCount = new AtomicInteger(0),
717+
currentMsgCount = 5,
718+
existingContainerCount = 2,
719+
inProgressContainerCount = 0,
720+
staleActivationNum = 2,
721+
existingContainerCountInNamespace = 2,
722+
inProgressContainerCountInNamespace = 0,
723+
averageDuration = Some(50), // the average duration gives container throughput of 2
724+
limit = 10,
725+
stateName = Running,
726+
recipient = testProbe.ref)
727+
728+
decisionMaker ! msg
729+
730+
//should add one additional container for stale messages and non-stale messages still meet tps
731+
testProbe.expectMsg(DecisionResults(AddContainer, 1))
732+
}
733+
686734
it should "enable namespace throttling while adding more container when there are stale messages even in the GracefulShuttingDown" in {
687735
val decisionMaker = system.actorOf(SchedulingDecisionMaker.props(testNamespace, action, schedulingConfig))
688736
val testProbe = TestProbe()

0 commit comments

Comments
 (0)