Skip to content

Commit f8ce7c1

Browse files
bdoyle0182Brendan Doyle
authored andcommitted
add config to fail async scheduler throttles as whisk errors (apache#5305)
* add config to fail async scheduler throttles as whisk errors * fix tests Co-authored-by: Brendan Doyle <[email protected]>
1 parent 9e1cc33 commit f8ce7c1

File tree

5 files changed

+13
-11
lines changed

5 files changed

+13
-11
lines changed

core/scheduler/src/main/resources/application.conf

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -79,6 +79,7 @@ whisk {
7979
max-blackbox-retention-ms = "300000"
8080
throttling-fraction = "0.9"
8181
duration-buffer-size = "10"
82+
fail-throttle-as-whisk-error = "false"
8283
}
8384
queue-manager {
8485
max-scheduling-time = "20 seconds"

core/scheduler/src/main/scala/org/apache/openwhisk/core/scheduler/queue/MemoryQueue.scala

Lines changed: 6 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -227,7 +227,7 @@ class MemoryQueue(private val etcdClient: EtcdClient,
227227
enableNamespaceThrottling()
228228

229229
if (dropMsg)
230-
completeAllActivations(tooManyConcurrentRequests, isWhiskError = false)
230+
completeAllActivations(tooManyConcurrentRequests, isWhiskError = queueConfig.failThrottleAsWhiskError)
231231
goto(NamespaceThrottled) using ThrottledData(data.schedulerActor, data.droppingActor)
232232

233233
case Event(StateTimeout, data: RunningData) =>
@@ -269,7 +269,7 @@ class MemoryQueue(private val etcdClient: EtcdClient,
269269
when(NamespaceThrottled) {
270270
case Event(msg: ActivationMessage, _: ThrottledData) =>
271271
if (containers.size + creationIds.size == 0) {
272-
completeErrorActivation(msg, tooManyConcurrentRequests, isWhiskError = false)
272+
completeErrorActivation(msg, tooManyConcurrentRequests, isWhiskError = queueConfig.failThrottleAsWhiskError)
273273
} else {
274274
handleActivationMessage(msg)
275275
}
@@ -285,7 +285,7 @@ class MemoryQueue(private val etcdClient: EtcdClient,
285285
when(ActionThrottled) {
286286
// since there are already too many activation messages, it drops the new messages
287287
case Event(msg: ActivationMessage, ThrottledData(_, _)) =>
288-
completeErrorActivation(msg, tooManyConcurrentRequests, isWhiskError = false)
288+
completeErrorActivation(msg, tooManyConcurrentRequests, isWhiskError = queueConfig.failThrottleAsWhiskError)
289289
stay
290290
}
291291

@@ -823,7 +823,7 @@ class MemoryQueue(private val etcdClient: EtcdClient,
823823
CompletionMessage(activation.transid, activationResponse, instance)
824824
}
825825

826-
if (!isWhiskError && message == tooManyConcurrentRequests) {
826+
if (message == tooManyConcurrentRequests) {
827827
val metric = Metric("ConcurrentRateLimit", 1)
828828
UserEvents.send(
829829
messagingProducer,
@@ -1208,7 +1208,8 @@ case class QueueConfig(idleGrace: FiniteDuration,
12081208
maxRetentionMs: Long,
12091209
maxBlackboxRetentionMs: Long,
12101210
throttlingFraction: Double,
1211-
durationBufferSize: Int)
1211+
durationBufferSize: Int,
1212+
failThrottleAsWhiskError: Boolean)
12121213

12131214
case class BufferedRequest(containerId: String, promise: Promise[Either[MemoryQueueError, ActivationMessage]])
12141215
case object DropOld

tests/src/test/scala/org/apache/openwhisk/core/scheduler/queue/test/MemoryQueueFlowTests.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -517,7 +517,7 @@ class MemoryQueueFlowTests
517517

518518
// max retention size is 10 and throttling fraction is 0.8
519519
// queue will be action throttled at 10 messages and disabled action throttling at 8 messages
520-
val queueConfig = QueueConfig(5 seconds, 10 seconds, 10 seconds, 5 seconds, 10, 5000, 10000, 0.8, 10)
520+
val queueConfig = QueueConfig(5 seconds, 10 seconds, 10 seconds, 5 seconds, 10, 5000, 10000, 0.8, 10, false)
521521

522522
// limit is 1
523523
val getUserLimit = (_: String) => Future.successful(1)

tests/src/test/scala/org/apache/openwhisk/core/scheduler/queue/test/MemoryQueueTests.scala

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1116,7 +1116,7 @@ class MemoryQueueTests
11161116

11171117
expectDurationChecking(mockEsClient, testInvocationNamespace)
11181118

1119-
val queueConfig = QueueConfig(5 seconds, 10 seconds, 10 seconds, 5 seconds, 10, 10000, 20000, 0.9, 10)
1119+
val queueConfig = QueueConfig(5 seconds, 10 seconds, 10 seconds, 5 seconds, 10, 10000, 20000, 0.9, 10, false)
11201120

11211121
val fsm =
11221122
TestFSMRef(
@@ -1342,7 +1342,7 @@ class MemoryQueueTests
13421342

13431343
expectDurationChecking(mockEsClient, testInvocationNamespace)
13441344

1345-
val queueConfig = QueueConfig(5 seconds, 10 seconds, 10 seconds, 5 seconds, 10, 10000, 20000, 0.9, 10)
1345+
val queueConfig = QueueConfig(5 seconds, 10 seconds, 10 seconds, 5 seconds, 10, 10000, 20000, 0.9, 10, false)
13461346

13471347
val fsm =
13481348
TestFSMRef(
@@ -1585,7 +1585,7 @@ class MemoryQueueTests
15851585
// it always induces the throttling
15861586
val getZeroLimit = (_: String) => { Future.successful(2) }
15871587

1588-
val queueConfig = QueueConfig(5 seconds, 10 seconds, 10 seconds, 5 seconds, 1, 5000, 10000, 0.9, 10)
1588+
val queueConfig = QueueConfig(5 seconds, 10 seconds, 10 seconds, 5 seconds, 1, 5000, 10000, 0.9, 10, false)
15891589

15901590
expectDurationChecking(mockEsClient, testInvocationNamespace)
15911591

@@ -1632,7 +1632,7 @@ class MemoryQueueTests
16321632
val probe = TestProbe()
16331633
val parent = TestProbe()
16341634

1635-
val queueConfig = QueueConfig(5 seconds, 10 seconds, 10 seconds, 5 seconds, 10, 5000, 10000, 0.9, 10)
1635+
val queueConfig = QueueConfig(5 seconds, 10 seconds, 10 seconds, 5 seconds, 10, 5000, 10000, 0.9, 10, false)
16361636
val msgRetentionSize = queueConfig.maxRetentionSize
16371637

16381638
val tid = TransactionId(TransactionId.generateTid())

tests/src/test/scala/org/apache/openwhisk/core/scheduler/queue/test/MemoryQueueTestsFixture.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -154,7 +154,7 @@ class MemoryQueueTestsFixture
154154
val actionThrottlingKey = ThrottlingKeys.action(testInvocationNamespace, fqn.copy(version = None))
155155

156156
// queue variables
157-
val queueConfig = QueueConfig(5 seconds, 10 seconds, 5 seconds, 5 seconds, 10, 10000, 20000, 0.9, 10)
157+
val queueConfig = QueueConfig(5 seconds, 10 seconds, 5 seconds, 5 seconds, 10, 10000, 20000, 0.9, 10, false)
158158
val idleGrace = queueConfig.idleGrace
159159
val stopGrace = queueConfig.stopGrace
160160
val flushGrace = queueConfig.flushGrace

0 commit comments

Comments
 (0)