Skip to content

Commit 8fd2156

Browse files
authored
Go to the NamespaceThrottled state rather than Flushing state. (#5303)
* Currently MemoryQueue will go to Flushing state when receive a EnableNamespaceThrottling(dropMsg=true) message, but the Flushing state doesn't have a case to disable namespace throttling at all. * Remove unused import.
1 parent cf127f9 commit 8fd2156

File tree

3 files changed

+15
-20
lines changed

3 files changed

+15
-20
lines changed

core/scheduler/src/main/scala/org/apache/openwhisk/core/scheduler/queue/MemoryQueue.scala

Lines changed: 6 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -17,15 +17,13 @@
1717

1818
package org.apache.openwhisk.core.scheduler.queue
1919

20-
import java.time.{Duration, Instant}
21-
import java.util.concurrent.atomic.AtomicInteger
2220
import akka.actor.Status.{Failure => FailureMessage}
2321
import akka.actor.{ActorRef, ActorSystem, Cancellable, FSM, Props, Stash}
2422
import akka.util.Timeout
2523
import org.apache.openwhisk.common._
2624
import org.apache.openwhisk.core.ConfigKeys
2725
import org.apache.openwhisk.core.ack.ActiveAck
28-
import org.apache.openwhisk.core.connector.ContainerCreationError.{TooManyConcurrentRequests, ZeroNamespaceLimit}
26+
import org.apache.openwhisk.core.connector.ContainerCreationError.ZeroNamespaceLimit
2927
import org.apache.openwhisk.core.connector._
3028
import org.apache.openwhisk.core.containerpool.Interval
3129
import org.apache.openwhisk.core.database.{NoDocumentException, UserContext}
@@ -44,10 +42,12 @@ import org.apache.openwhisk.core.scheduler.message.{
4442
import org.apache.openwhisk.core.scheduler.{SchedulerEndpoints, SchedulingConfig}
4543
import org.apache.openwhisk.core.service._
4644
import org.apache.openwhisk.http.Messages.{namespaceLimitUnderZero, tooManyConcurrentRequests}
47-
import pureconfig.generic.auto._
4845
import pureconfig.loadConfigOrThrow
4946
import spray.json._
47+
import pureconfig.generic.auto._
5048

49+
import java.time.{Duration, Instant}
50+
import java.util.concurrent.atomic.AtomicInteger
5151
import scala.annotation.tailrec
5252
import scala.collection.immutable.Queue
5353
import scala.collection.mutable
@@ -224,18 +224,9 @@ class MemoryQueue(private val etcdClient: EtcdClient,
224224
logging.info(this, s"[$invocationNamespace:$action:$stateName] Enable namespace throttling.")
225225
enableNamespaceThrottling()
226226

227-
// if no container could be created, it is same with Flushing state.
228-
if (dropMsg) {
227+
if (dropMsg)
229228
completeAllActivations(tooManyConcurrentRequests, isWhiskError = false)
230-
goto(Flushing) using FlushingData(
231-
data.schedulerActor,
232-
data.droppingActor,
233-
TooManyConcurrentRequests,
234-
tooManyConcurrentRequests)
235-
} else {
236-
// if there are already some containers running, activations can still be processed so goto the NamespaceThrottled state.
237-
goto(NamespaceThrottled) using ThrottledData(data.schedulerActor, data.droppingActor)
238-
}
229+
goto(NamespaceThrottled) using ThrottledData(data.schedulerActor, data.droppingActor)
239230

240231
case Event(StateTimeout, data: RunningData) =>
241232
if (queue.isEmpty && (containers.size + creationIds.size) <= 0) {

tests/src/test/scala/org/apache/openwhisk/core/scheduler/queue/test/MemoryQueueFlowTests.scala

Lines changed: 8 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -280,7 +280,7 @@ class MemoryQueueFlowTests
280280
probe.expectTerminated(fsm, 10.seconds)
281281
}
282282

283-
it should "go to the Flushing state dropping messages when it can't create an initial container" in {
283+
it should "go to the NamespaceThrottled state dropping messages when it can't create an initial container" in {
284284
val mockEtcdClient = mock[EtcdClient]
285285
val parent = TestProbe()
286286
val watcher = TestProbe()
@@ -340,7 +340,7 @@ class MemoryQueueFlowTests
340340
fsm ! message
341341

342342
dataMgmtService.expectMsg(RegisterData(namespaceThrottlingKey, true.toString, failoverEnabled = false))
343-
probe.expectMsg(Transition(fsm, Running, Flushing))
343+
probe.expectMsg(Transition(fsm, Running, NamespaceThrottled))
344344

345345
awaitAssert({
346346
ackedMessageCount shouldBe 1
@@ -352,13 +352,17 @@ class MemoryQueueFlowTests
352352
fsm.underlyingActor.queue.size shouldBe 0
353353
}, 5.seconds)
354354

355-
parent.expectMsg(flushGrace * 2 + 5.seconds, queueRemovedMsg)
356-
probe.expectMsg(Transition(fsm, Flushing, Removed))
355+
fsm ! GracefulShutdown
356+
357+
parent.expectMsg(queueRemovedMsg)
358+
probe.expectMsg(Transition(fsm, NamespaceThrottled, Removing))
357359

358360
fsm ! QueueRemovedCompleted
359361

360362
expectDataCleanUp(watcher, dataMgmtService)
361363

364+
probe.expectMsg(Transition(fsm, Removing, Removed))
365+
362366
probe.expectTerminated(fsm, 10.seconds)
363367
}
364368

tests/src/test/scala/org/apache/openwhisk/core/scheduler/queue/test/MemoryQueueTests.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1357,7 +1357,7 @@ class MemoryQueueTests
13571357
parent.expectMsg(10 seconds, Transition(fsm, Uninitialized, Running))
13581358

13591359
fsm ! EnableNamespaceThrottling(dropMsg = true)
1360-
parent.expectMsg(10 seconds, Transition(fsm, Running, Flushing))
1360+
parent.expectMsg(10 seconds, Transition(fsm, Running, NamespaceThrottled))
13611361
dataManagementService.expectMsg(RegisterData(namespaceThrottlingKey, true.toString, false))
13621362

13631363
fsm.stop()

0 commit comments

Comments
 (0)