Skip to content

Commit 077fb6d

Browse files
bdoyle0182Brendan Doyle
andauthored
Add scheduler overprovision for new actions before namespace throttling (#5284)
* initial attempt * tests * fix tests * enable throttling when last capacity used in overprovisioning * add case to correctly disable namespace throttling when namespace overprovisioning has space * feedback Co-authored-by: Brendan Doyle <[email protected]>
1 parent 0f4b0c2 commit 077fb6d

File tree

5 files changed

+164
-9
lines changed

5 files changed

+164
-9
lines changed

core/scheduler/src/main/resources/application.conf

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -79,6 +79,8 @@ whisk {
7979
stale-threshold = "100 milliseconds"
8080
check-interval = "100 milliseconds"
8181
drop-interval = "10 seconds"
82+
allow-over-provision-before-throttle = false
83+
namespace-over-provision-before-throttle-ratio = 1.5
8284
}
8385
queue {
8486
idle-grace = "20 seconds"

core/scheduler/src/main/scala/org/apache/openwhisk/core/scheduler/Scheduler.scala

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -421,4 +421,8 @@ object SchedulerStates extends DefaultJsonProtocol {
421421
def parse(states: String) = Try(serdes.read(states.parseJson))
422422
}
423423

424-
case class SchedulingConfig(staleThreshold: FiniteDuration, checkInterval: FiniteDuration, dropInterval: FiniteDuration)
424+
case class SchedulingConfig(staleThreshold: FiniteDuration,
425+
checkInterval: FiniteDuration,
426+
dropInterval: FiniteDuration,
427+
allowOverProvisionBeforeThrottle: Boolean,
428+
namespaceOverProvisionBeforeThrottleRatio: Double)

core/scheduler/src/main/scala/org/apache/openwhisk/core/scheduler/queue/SchedulingDecisionMaker.scala

Lines changed: 16 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -70,7 +70,17 @@ class SchedulingDecisionMaker(
7070
case _ => Future.successful(DecisionResults(Pausing, 0))
7171
}
7272
} else {
73-
val capacity = limit - existingContainerCountInNs - inProgressContainerCountInNs
73+
val capacity = if (schedulingConfig.allowOverProvisionBeforeThrottle && totalContainers == 0) {
74+
// if space available within the over provision ratio amount above namespace limit, create one container for new
75+
// action so namespace traffic can attempt to re-balance without blocking entire action
76+
if ((ceiling(limit * schedulingConfig.namespaceOverProvisionBeforeThrottleRatio) - existingContainerCountInNs - inProgressContainerCountInNs) > 0) {
77+
1
78+
} else {
79+
0
80+
}
81+
} else {
82+
limit - existingContainerCountInNs - inProgressContainerCountInNs
83+
}
7484
if (capacity <= 0) {
7585
stateName match {
7686

@@ -79,12 +89,15 @@ class SchedulingDecisionMaker(
7989
*
8090
* However, if the container exists(totalContainers != 0), the activation is not treated as a failure and the activation is delivered to the container.
8191
*/
82-
case Running =>
92+
case Running
93+
if !schedulingConfig.allowOverProvisionBeforeThrottle || (schedulingConfig.allowOverProvisionBeforeThrottle && ceiling(
94+
limit * schedulingConfig.namespaceOverProvisionBeforeThrottleRatio) - existingContainerCountInNs - inProgressContainerCountInNs <= 0) =>
8395
logging.info(
8496
this,
8597
s"there is no capacity activations will be dropped or throttled, (availableMsg: $availableMsg totalContainers: $totalContainers, limit: $limit, namespaceContainers: ${existingContainerCountInNs}, namespaceInProgressContainer: ${inProgressContainerCountInNs}) [$invocationNamespace:$action]")
8698
Future.successful(DecisionResults(EnableNamespaceThrottling(dropMsg = totalContainers == 0), 0))
87-
99+
case NamespaceThrottled if schedulingConfig.allowOverProvisionBeforeThrottle && ceiling(limit * schedulingConfig.namespaceOverProvisionBeforeThrottleRatio) - existingContainerCountInNs - inProgressContainerCountInNs > 0 =>
100+
Future.successful(DecisionResults(DisableNamespaceThrottling, 0))
88101
// do nothing
89102
case _ =>
90103
// no need to print any messages if the state is already NamespaceThrottled

tests/src/test/scala/org/apache/openwhisk/core/scheduler/queue/test/MemoryQueueTestsFixture.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -86,7 +86,7 @@ class MemoryQueueTestsFixture
8686
val testNamespace = "test-namespace"
8787
val testAction = "test-action"
8888

89-
val schedulingConfig = SchedulingConfig(100.milliseconds, 100.milliseconds, 10.seconds)
89+
val schedulingConfig = SchedulingConfig(100.milliseconds, 100.milliseconds, 10.seconds, false, 1.5)
9090

9191
val fqn = FullyQualifiedEntityName(EntityPath(testNamespace), EntityName(testAction), Some(SemVer(0, 0, 1)))
9292
val revision = DocRevision("1-testRev")

tests/src/test/scala/org/apache/openwhisk/core/scheduler/queue/test/SchedulingDecisionMakerTests.scala

Lines changed: 140 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -47,7 +47,7 @@ class SchedulingDecisionMakerTests
4747
val testAction = "test-action"
4848
val action = FullyQualifiedEntityName(EntityPath(testNamespace), EntityName(testAction), Some(SemVer(0, 0, 1)))
4949

50-
val schedulingConfig = SchedulingConfig(100.milliseconds, 100.milliseconds, 10.seconds)
50+
val schedulingConfig = SchedulingConfig(100.milliseconds, 100.milliseconds, 10.seconds, false, 1.5)
5151

5252
it should "decide pausing when the limit is less than equal to 0" in {
5353
val decisionMaker = system.actorOf(SchedulingDecisionMaker.props(testNamespace, action, schedulingConfig))
@@ -149,7 +149,7 @@ class SchedulingDecisionMakerTests
149149
}
150150
}
151151

152-
it should "enable namespace throttling with dropping msg when there is not enough capacity and no container" in {
152+
it should "enable namespace throttling with dropping msg when there is not enough capacity, no container, and namespace over-provision disabled" in {
153153
val decisionMaker = system.actorOf(SchedulingDecisionMaker.props(testNamespace, action, schedulingConfig))
154154
val testProbe = TestProbe()
155155

@@ -173,7 +173,7 @@ class SchedulingDecisionMakerTests
173173
testProbe.expectMsg(DecisionResults(EnableNamespaceThrottling(dropMsg = true), 0))
174174
}
175175

176-
it should "enable namespace throttling without dropping msg when there is not enough capacity but are some containers" in {
176+
it should "enable namespace throttling without dropping msg when there is not enough capacity but are some containers and namespace over-provision disabled" in {
177177
val decisionMaker = system.actorOf(SchedulingDecisionMaker.props(testNamespace, action, schedulingConfig))
178178
val testProbe = TestProbe()
179179

@@ -197,7 +197,142 @@ class SchedulingDecisionMakerTests
197197
testProbe.expectMsg(DecisionResults(EnableNamespaceThrottling(dropMsg = false), 0))
198198
}
199199

200-
it should "add an initial container if there is no any" in {
200+
it should "add one container when there is no container, and namespace over-provision has capacity" in {
201+
val schedulingConfigNamespaceOverProvisioning =
202+
SchedulingConfig(100.milliseconds, 100.milliseconds, 10.seconds, true, 1.5)
203+
val decisionMaker =
204+
system.actorOf(SchedulingDecisionMaker.props(testNamespace, action, schedulingConfigNamespaceOverProvisioning))
205+
val testProbe = TestProbe()
206+
207+
val msg = QueueSnapshot(
208+
initialized = false,
209+
incomingMsgCount = new AtomicInteger(0),
210+
currentMsgCount = 0,
211+
existingContainerCount = 0, // there is no container for this action
212+
inProgressContainerCount = 0,
213+
staleActivationNum = 0,
214+
existingContainerCountInNamespace = 1, // but there are already 2 containers in this namespace
215+
inProgressContainerCountInNamespace = 1,
216+
averageDuration = None,
217+
limit = 2,
218+
stateName = Running,
219+
recipient = testProbe.ref)
220+
221+
decisionMaker ! msg
222+
223+
// this queue cannot create an initial container so enable throttling and drop messages.
224+
testProbe.expectMsg(DecisionResults(AddInitialContainer, 1))
225+
}
226+
227+
it should "enable namespace throttling with dropping msg when there is no container, and namespace over-provision has no capacity" in {
228+
val schedulingConfigNamespaceOverProvisioning =
229+
SchedulingConfig(100.milliseconds, 100.milliseconds, 10.seconds, true, 1.0)
230+
val decisionMaker =
231+
system.actorOf(SchedulingDecisionMaker.props(testNamespace, action, schedulingConfigNamespaceOverProvisioning))
232+
val testProbe = TestProbe()
233+
234+
val msg = QueueSnapshot(
235+
initialized = true,
236+
incomingMsgCount = new AtomicInteger(0),
237+
currentMsgCount = 0,
238+
existingContainerCount = 0, // there is no container for this action
239+
inProgressContainerCount = 0,
240+
staleActivationNum = 0,
241+
existingContainerCountInNamespace = 1, // but there are already 2 containers in this namespace
242+
inProgressContainerCountInNamespace = 1,
243+
averageDuration = None,
244+
limit = 2,
245+
stateName = Running,
246+
recipient = testProbe.ref)
247+
248+
decisionMaker ! msg
249+
250+
// this queue cannot create an initial container so enable throttling and drop messages.
251+
testProbe.expectMsg(DecisionResults(EnableNamespaceThrottling(dropMsg = true), 0))
252+
}
253+
254+
it should "disable namespace throttling when namespace over-provision has capacity again" in {
255+
val schedulingConfigNamespaceOverProvisioning =
256+
SchedulingConfig(100.milliseconds, 100.milliseconds, 10.seconds, true, 1.1)
257+
val decisionMaker =
258+
system.actorOf(SchedulingDecisionMaker.props(testNamespace, action, schedulingConfigNamespaceOverProvisioning))
259+
val testProbe = TestProbe()
260+
261+
val msg = QueueSnapshot(
262+
initialized = true,
263+
incomingMsgCount = new AtomicInteger(0),
264+
currentMsgCount = 0,
265+
existingContainerCount = 1, // there is one container for this action
266+
inProgressContainerCount = 0,
267+
staleActivationNum = 0,
268+
existingContainerCountInNamespace = 1, // but there are already 2 containers in this namespace
269+
inProgressContainerCountInNamespace = 1,
270+
averageDuration = None,
271+
limit = 2,
272+
stateName = NamespaceThrottled,
273+
recipient = testProbe.ref)
274+
275+
decisionMaker ! msg
276+
277+
// this queue cannot create an initial container so enable throttling and drop messages.
278+
testProbe.expectMsg(DecisionResults(DisableNamespaceThrottling, 0))
279+
}
280+
281+
it should "enable namespace throttling without dropping msg when there is a container, and namespace over-provision has no additional capacity" in {
282+
val schedulingConfigNamespaceOverProvisioning =
283+
SchedulingConfig(100.milliseconds, 100.milliseconds, 10.seconds, true, 1.0)
284+
val decisionMaker =
285+
system.actorOf(SchedulingDecisionMaker.props(testNamespace, action, schedulingConfigNamespaceOverProvisioning))
286+
val testProbe = TestProbe()
287+
288+
val msg = QueueSnapshot(
289+
initialized = true,
290+
incomingMsgCount = new AtomicInteger(0),
291+
currentMsgCount = 0,
292+
existingContainerCount = 1,
293+
inProgressContainerCount = 0,
294+
staleActivationNum = 0,
295+
existingContainerCountInNamespace = 1, // but there are already 2 containers in this namespace
296+
inProgressContainerCountInNamespace = 1,
297+
averageDuration = None,
298+
limit = 2,
299+
stateName = Running,
300+
recipient = testProbe.ref)
301+
302+
decisionMaker ! msg
303+
304+
// this queue cannot create an additional container so enable throttling and drop messages.
305+
testProbe.expectMsg(DecisionResults(EnableNamespaceThrottling(dropMsg = false), 0))
306+
}
307+
308+
it should "not enable namespace throttling when there is not enough capacity but are some containers and namespace over-provision is enabled with capacity" in {
309+
val schedulingConfigNamespaceOverProvisioning =
310+
SchedulingConfig(100.milliseconds, 100.milliseconds, 10.seconds, true, 1.5)
311+
val decisionMaker =
312+
system.actorOf(SchedulingDecisionMaker.props(testNamespace, action, schedulingConfigNamespaceOverProvisioning))
313+
val testProbe = TestProbe()
314+
315+
val msg = QueueSnapshot(
316+
initialized = true,
317+
incomingMsgCount = new AtomicInteger(0),
318+
currentMsgCount = 0,
319+
existingContainerCount = 1, // there are some containers for this action
320+
inProgressContainerCount = 1,
321+
staleActivationNum = 0,
322+
existingContainerCountInNamespace = 2, // but there are already 2 containers in this namespace
323+
inProgressContainerCountInNamespace = 2, // this value includes the count of this action as well.
324+
averageDuration = None,
325+
limit = 4,
326+
stateName = Running,
327+
recipient = testProbe.ref)
328+
329+
decisionMaker ! msg
330+
331+
// this queue cannot create more containers
332+
testProbe.expectNoMessage()
333+
}
334+
335+
it should "add an initial container if there is not any" in {
201336
val decisionMaker = system.actorOf(SchedulingDecisionMaker.props(testNamespace, action, schedulingConfig))
202337
val testProbe = TestProbe()
203338

@@ -219,6 +354,7 @@ class SchedulingDecisionMakerTests
219354

220355
testProbe.expectMsg(DecisionResults(AddInitialContainer, 1))
221356
}
357+
222358
it should "disable the namespace throttling with adding an initial container when there is no container" in {
223359
val decisionMaker = system.actorOf(SchedulingDecisionMaker.props(testNamespace, action, schedulingConfig))
224360
val testProbe = TestProbe()

0 commit comments

Comments
 (0)