Skip to content

Commit 415ae98

Browse files
bdoyle0182Brendan Doyle
andauthored
make scheduler consider action concurrency >1 (#5378)
Co-authored-by: Brendan Doyle <[email protected]>
1 parent 9d96c6d commit 415ae98

File tree

5 files changed

+153
-7
lines changed

5 files changed

+153
-7
lines changed

core/scheduler/src/main/scala/org/apache/openwhisk/core/scheduler/queue/MemoryQueue.scala

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -943,6 +943,7 @@ class MemoryQueue(private val etcdClient: EtcdClient,
943943
namespaceContainerCount.inProgressContainerNumByNamespace,
944944
averageDuration,
945945
limit,
946+
actionMetaData.limits.concurrency.maxConcurrent,
946947
stateName,
947948
self)
948949
case Failure(_: NoDocumentException) =>
@@ -1222,6 +1223,7 @@ case class QueueSnapshot(initialized: Boolean,
12221223
inProgressContainerCountInNamespace: Int,
12231224
averageDuration: Option[Double],
12241225
limit: Int,
1226+
maxActionConcurrency: Int,
12251227
stateName: MemoryQueueState,
12261228
recipient: ActorRef)
12271229

core/scheduler/src/main/scala/org/apache/openwhisk/core/scheduler/queue/SchedulingDecisionMaker.scala

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -58,6 +58,7 @@ class SchedulingDecisionMaker(
5858
inProgressContainerCountInNs,
5959
averageDuration,
6060
limit,
61+
maxActionConcurrency,
6162
stateName,
6263
_) = snapshot
6364
val totalContainers = existing + inProgress
@@ -137,7 +138,7 @@ class SchedulingDecisionMaker(
137138
// but it is a kind of trade-off and we place latency on top of over-provisioning
138139
case (Running, None) if staleActivationNum > 0 =>
139140
// we can safely get the value as we already checked the existence
140-
val num = staleActivationNum - inProgress
141+
val num = ceiling(staleActivationNum.toDouble / maxActionConcurrency.toDouble) - inProgress
141142
// if it tries to create more containers than existing messages, we just create shortage
142143
val actualNum = if (num > availableMsg) availableMsg else num
143144
addServersIfPossible(
@@ -153,7 +154,7 @@ class SchedulingDecisionMaker(
153154
// need more containers and a message is already processed
154155
case (Running, Some(duration)) =>
155156
// we can safely get the value as we already checked the existence
156-
val containerThroughput = staleThreshold / duration
157+
val containerThroughput = (staleThreshold / duration) * maxActionConcurrency
157158
val expectedTps = containerThroughput * (existing + inProgress)
158159
val availableNonStaleActivations = availableMsg - staleActivationNum
159160

@@ -201,7 +202,7 @@ class SchedulingDecisionMaker(
201202
// this case is for that as a last resort.
202203
case (Removing, Some(duration)) if staleActivationNum > 0 =>
203204
// we can safely get the value as we already checked the existence
204-
val containerThroughput = staleThreshold / duration
205+
val containerThroughput = (staleThreshold / duration) * maxActionConcurrency
205206
val num = ceiling(staleActivationNum.toDouble / containerThroughput)
206207
// if it tries to create more containers than existing messages, we just create shortage
207208
val actualNum = (if (num > staleActivationNum) staleActivationNum else num) - inProgress
@@ -219,7 +220,7 @@ class SchedulingDecisionMaker(
219220
// same with the above case but no duration exist.
220221
case (Removing, None) if staleActivationNum > 0 =>
221222
// we can safely get the value as we already checked the existence
222-
val num = staleActivationNum - inProgress
223+
val num = ceiling(staleActivationNum.toDouble / maxActionConcurrency.toDouble) - inProgress
223224
// if it tries to create more containers than existing messages, we just create shortage
224225
val actualNum = if (num > availableMsg) availableMsg else num
225226
addServersIfPossible(

tests/src/test/scala/org/apache/openwhisk/core/scheduler/queue/test/MemoryQueueFlowTests.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -313,7 +313,7 @@ class MemoryQueueFlowTests
313313
// this is the case where there is no capacity in a namespace and no container can be created.
314314
decisionMaker.setAutoPilot((sender: ActorRef, msg) => {
315315
msg match {
316-
case QueueSnapshot(_, _, _, _, _, _, _, _, _, _, Running, _) =>
316+
case QueueSnapshot(_, _, _, _, _, _, _, _, _, _, _, Running, _) =>
317317
sender ! DecisionResults(EnableNamespaceThrottling(true), 0)
318318
TestActor.KeepRunning
319319

tests/src/test/scala/org/apache/openwhisk/core/scheduler/queue/test/MemoryQueueTests.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1534,7 +1534,7 @@ class MemoryQueueTests
15341534
// This test pilot mimic the decision maker who disable the namespace throttling when there is enough capacity.
15351535
decisionMaker.setAutoPilot((sender: ActorRef, msg) => {
15361536
msg match {
1537-
case QueueSnapshot(_, _, _, _, _, _, _, _, _, _, NamespaceThrottled, _) =>
1537+
case QueueSnapshot(_, _, _, _, _, _, _, _, _, _, _, NamespaceThrottled, _) =>
15381538
sender ! DisableNamespaceThrottling
15391539

15401540
case _ =>

0 commit comments

Comments
 (0)