@@ -47,7 +47,7 @@ class SchedulingDecisionMakerTests
47
47
val testAction = " test-action"
48
48
val action = FullyQualifiedEntityName (EntityPath (testNamespace), EntityName (testAction), Some (SemVer (0 , 0 , 1 )))
49
49
50
- val schedulingConfig = SchedulingConfig (100 .milliseconds, 100 .milliseconds, 10 .seconds)
50
+ val schedulingConfig = SchedulingConfig (100 .milliseconds, 100 .milliseconds, 10 .seconds, false , 1.5 )
51
51
52
52
it should " decide pausing when the limit is less than equal to 0" in {
53
53
val decisionMaker = system.actorOf(SchedulingDecisionMaker .props(testNamespace, action, schedulingConfig))
@@ -149,7 +149,7 @@ class SchedulingDecisionMakerTests
149
149
}
150
150
}
151
151
152
- it should " enable namespace throttling with dropping msg when there is not enough capacity and no container" in {
152
+ it should " enable namespace throttling with dropping msg when there is not enough capacity, no container, and namespace over-provision disabled " in {
153
153
val decisionMaker = system.actorOf(SchedulingDecisionMaker .props(testNamespace, action, schedulingConfig))
154
154
val testProbe = TestProbe ()
155
155
@@ -173,7 +173,7 @@ class SchedulingDecisionMakerTests
173
173
testProbe.expectMsg(DecisionResults (EnableNamespaceThrottling (dropMsg = true ), 0 ))
174
174
}
175
175
176
- it should " enable namespace throttling without dropping msg when there is not enough capacity but are some containers" in {
176
+ it should " enable namespace throttling without dropping msg when there is not enough capacity but are some containers and namespace over-provision disabled " in {
177
177
val decisionMaker = system.actorOf(SchedulingDecisionMaker .props(testNamespace, action, schedulingConfig))
178
178
val testProbe = TestProbe ()
179
179
@@ -197,7 +197,142 @@ class SchedulingDecisionMakerTests
197
197
testProbe.expectMsg(DecisionResults (EnableNamespaceThrottling (dropMsg = false ), 0 ))
198
198
}
199
199
200
- it should " add an initial container if there is no any" in {
200
+ it should " add one container when there is no container, and namespace over-provision has capacity" in {
201
+ val schedulingConfigNamespaceOverProvisioning =
202
+ SchedulingConfig (100 .milliseconds, 100 .milliseconds, 10 .seconds, true , 1.5 )
203
+ val decisionMaker =
204
+ system.actorOf(SchedulingDecisionMaker .props(testNamespace, action, schedulingConfigNamespaceOverProvisioning))
205
+ val testProbe = TestProbe ()
206
+
207
+ val msg = QueueSnapshot (
208
+ initialized = false ,
209
+ incomingMsgCount = new AtomicInteger (0 ),
210
+ currentMsgCount = 0 ,
211
+ existingContainerCount = 0 , // there is no container for this action
212
+ inProgressContainerCount = 0 ,
213
+ staleActivationNum = 0 ,
214
+ existingContainerCountInNamespace = 1 , // but there are already 2 containers in this namespace
215
+ inProgressContainerCountInNamespace = 1 ,
216
+ averageDuration = None ,
217
+ limit = 2 ,
218
+ stateName = Running ,
219
+ recipient = testProbe.ref)
220
+
221
+ decisionMaker ! msg
222
+
223
+ // this queue cannot create an initial container so enable throttling and drop messages.
224
+ testProbe.expectMsg(DecisionResults (AddInitialContainer , 1 ))
225
+ }
226
+
227
+ it should " enable namespace throttling with dropping msg when there is no container, and namespace over-provision has no capacity" in {
228
+ val schedulingConfigNamespaceOverProvisioning =
229
+ SchedulingConfig (100 .milliseconds, 100 .milliseconds, 10 .seconds, true , 1.0 )
230
+ val decisionMaker =
231
+ system.actorOf(SchedulingDecisionMaker .props(testNamespace, action, schedulingConfigNamespaceOverProvisioning))
232
+ val testProbe = TestProbe ()
233
+
234
+ val msg = QueueSnapshot (
235
+ initialized = true ,
236
+ incomingMsgCount = new AtomicInteger (0 ),
237
+ currentMsgCount = 0 ,
238
+ existingContainerCount = 0 , // there is no container for this action
239
+ inProgressContainerCount = 0 ,
240
+ staleActivationNum = 0 ,
241
+ existingContainerCountInNamespace = 1 , // but there are already 2 containers in this namespace
242
+ inProgressContainerCountInNamespace = 1 ,
243
+ averageDuration = None ,
244
+ limit = 2 ,
245
+ stateName = Running ,
246
+ recipient = testProbe.ref)
247
+
248
+ decisionMaker ! msg
249
+
250
+ // this queue cannot create an initial container so enable throttling and drop messages.
251
+ testProbe.expectMsg(DecisionResults (EnableNamespaceThrottling (dropMsg = true ), 0 ))
252
+ }
253
+
254
+ it should " disable namespace throttling when namespace over-provision has capacity again" in {
255
+ val schedulingConfigNamespaceOverProvisioning =
256
+ SchedulingConfig (100 .milliseconds, 100 .milliseconds, 10 .seconds, true , 1.1 )
257
+ val decisionMaker =
258
+ system.actorOf(SchedulingDecisionMaker .props(testNamespace, action, schedulingConfigNamespaceOverProvisioning))
259
+ val testProbe = TestProbe ()
260
+
261
+ val msg = QueueSnapshot (
262
+ initialized = true ,
263
+ incomingMsgCount = new AtomicInteger (0 ),
264
+ currentMsgCount = 0 ,
265
+ existingContainerCount = 1 , // there is one container for this action
266
+ inProgressContainerCount = 0 ,
267
+ staleActivationNum = 0 ,
268
+ existingContainerCountInNamespace = 1 , // but there are already 2 containers in this namespace
269
+ inProgressContainerCountInNamespace = 1 ,
270
+ averageDuration = None ,
271
+ limit = 2 ,
272
+ stateName = NamespaceThrottled ,
273
+ recipient = testProbe.ref)
274
+
275
+ decisionMaker ! msg
276
+
277
+ // this queue cannot create an initial container so enable throttling and drop messages.
278
+ testProbe.expectMsg(DecisionResults (DisableNamespaceThrottling , 0 ))
279
+ }
280
+
281
+ it should " enable namespace throttling without dropping msg when there is a container, and namespace over-provision has no additional capacity" in {
282
+ val schedulingConfigNamespaceOverProvisioning =
283
+ SchedulingConfig (100 .milliseconds, 100 .milliseconds, 10 .seconds, true , 1.0 )
284
+ val decisionMaker =
285
+ system.actorOf(SchedulingDecisionMaker .props(testNamespace, action, schedulingConfigNamespaceOverProvisioning))
286
+ val testProbe = TestProbe ()
287
+
288
+ val msg = QueueSnapshot (
289
+ initialized = true ,
290
+ incomingMsgCount = new AtomicInteger (0 ),
291
+ currentMsgCount = 0 ,
292
+ existingContainerCount = 1 ,
293
+ inProgressContainerCount = 0 ,
294
+ staleActivationNum = 0 ,
295
+ existingContainerCountInNamespace = 1 , // but there are already 2 containers in this namespace
296
+ inProgressContainerCountInNamespace = 1 ,
297
+ averageDuration = None ,
298
+ limit = 2 ,
299
+ stateName = Running ,
300
+ recipient = testProbe.ref)
301
+
302
+ decisionMaker ! msg
303
+
304
+ // this queue cannot create an additional container so enable throttling and drop messages.
305
+ testProbe.expectMsg(DecisionResults (EnableNamespaceThrottling (dropMsg = false ), 0 ))
306
+ }
307
+
308
+ it should " not enable namespace throttling when there is not enough capacity but are some containers and namespace over-provision is enabled with capacity" in {
309
+ val schedulingConfigNamespaceOverProvisioning =
310
+ SchedulingConfig (100 .milliseconds, 100 .milliseconds, 10 .seconds, true , 1.5 )
311
+ val decisionMaker =
312
+ system.actorOf(SchedulingDecisionMaker .props(testNamespace, action, schedulingConfigNamespaceOverProvisioning))
313
+ val testProbe = TestProbe ()
314
+
315
+ val msg = QueueSnapshot (
316
+ initialized = true ,
317
+ incomingMsgCount = new AtomicInteger (0 ),
318
+ currentMsgCount = 0 ,
319
+ existingContainerCount = 1 , // there are some containers for this action
320
+ inProgressContainerCount = 1 ,
321
+ staleActivationNum = 0 ,
322
+ existingContainerCountInNamespace = 2 , // but there are already 2 containers in this namespace
323
+ inProgressContainerCountInNamespace = 2 , // this value includes the count of this action as well.
324
+ averageDuration = None ,
325
+ limit = 4 ,
326
+ stateName = Running ,
327
+ recipient = testProbe.ref)
328
+
329
+ decisionMaker ! msg
330
+
331
+ // this queue cannot create more containers
332
+ testProbe.expectNoMessage()
333
+ }
334
+
335
+ it should " add an initial container if there is not any" in {
201
336
val decisionMaker = system.actorOf(SchedulingDecisionMaker .props(testNamespace, action, schedulingConfig))
202
337
val testProbe = TestProbe ()
203
338
@@ -219,6 +354,7 @@ class SchedulingDecisionMakerTests
219
354
220
355
testProbe.expectMsg(DecisionResults (AddInitialContainer , 1 ))
221
356
}
357
+
222
358
it should " disable the namespace throttling with adding an initial container when there is no container" in {
223
359
val decisionMaker = system.actorOf(SchedulingDecisionMaker .props(testNamespace, action, schedulingConfig))
224
360
val testProbe = TestProbe ()
0 commit comments