@@ -153,19 +153,24 @@ class SchedulingDecisionMaker(
153
153
Running )
154
154
// need more containers and a message is already processed
155
155
case (Running , Some (duration)) =>
156
- // we can safely get the value as we already checked the existence
157
- val containerThroughput = (staleThreshold / duration) * maxActionConcurrency
156
+ // we can safely get the value as we already checked the existence, have extra protection in case duration is somehow negative
157
+ val containerThroughput = if (duration <= 0 ) {
158
+ maxActionConcurrency
159
+ } else {
160
+ (staleThreshold / duration) * maxActionConcurrency
161
+ }
158
162
val expectedTps = containerThroughput * (existing + inProgress)
159
163
val availableNonStaleActivations = availableMsg - staleActivationNum
160
164
161
165
var staleContainerProvision = 0
162
166
if (staleActivationNum > 0 ) {
167
+
163
168
val num = ceiling(staleActivationNum.toDouble / containerThroughput)
164
169
// if it tries to create more containers than existing messages, we just create shortage
165
170
staleContainerProvision = (if (num > staleActivationNum) staleActivationNum else num) - inProgress
166
171
}
167
172
168
- if (availableNonStaleActivations >= expectedTps && existing + inProgress < availableNonStaleActivations) {
173
+ if (availableNonStaleActivations >= expectedTps && existing + inProgress < availableNonStaleActivations && duration > 0 ) {
169
174
val num = ceiling((availableNonStaleActivations / containerThroughput) - existing - inProgress)
170
175
// if it tries to create more containers than existing messages, we just create shortage
171
176
val actualNum =
@@ -202,7 +207,11 @@ class SchedulingDecisionMaker(
202
207
// this case is for that as a last resort.
203
208
case (Removing , Some (duration)) if staleActivationNum > 0 =>
204
209
// we can safely get the value as we already checked the existence
205
- val containerThroughput = (staleThreshold / duration) * maxActionConcurrency
210
+ val containerThroughput = if (duration <= 0 ) {
211
+ maxActionConcurrency
212
+ } else {
213
+ (staleThreshold / duration) * maxActionConcurrency
214
+ }
206
215
val num = ceiling(staleActivationNum.toDouble / containerThroughput)
207
216
// if it tries to create more containers than existing messages, we just create shortage
208
217
val actualNum = (if (num > staleActivationNum) staleActivationNum else num) - inProgress
0 commit comments