17
17
18
18
package org .apache .openwhisk .core .scheduler .queue
19
19
20
- import java .time .{Duration , Instant }
21
- import java .util .concurrent .atomic .AtomicInteger
22
20
import akka .actor .Status .{Failure => FailureMessage }
23
21
import akka .actor .{ActorRef , ActorSystem , Cancellable , FSM , Props , Stash }
24
22
import akka .util .Timeout
25
23
import org .apache .openwhisk .common ._
26
24
import org .apache .openwhisk .core .ConfigKeys
27
25
import org .apache .openwhisk .core .ack .ActiveAck
28
- import org .apache .openwhisk .core .connector .ContainerCreationError .{ TooManyConcurrentRequests , ZeroNamespaceLimit }
26
+ import org .apache .openwhisk .core .connector .ContainerCreationError .ZeroNamespaceLimit
29
27
import org .apache .openwhisk .core .connector ._
30
28
import org .apache .openwhisk .core .containerpool .Interval
31
29
import org .apache .openwhisk .core .database .{NoDocumentException , UserContext }
@@ -44,10 +42,12 @@ import org.apache.openwhisk.core.scheduler.message.{
44
42
import org .apache .openwhisk .core .scheduler .{SchedulerEndpoints , SchedulingConfig }
45
43
import org .apache .openwhisk .core .service ._
46
44
import org .apache .openwhisk .http .Messages .{namespaceLimitUnderZero , tooManyConcurrentRequests }
47
- import pureconfig .generic .auto ._
48
45
import pureconfig .loadConfigOrThrow
49
46
import spray .json ._
47
+ import pureconfig .generic .auto ._
50
48
49
+ import java .time .{Duration , Instant }
50
+ import java .util .concurrent .atomic .AtomicInteger
51
51
import scala .annotation .tailrec
52
52
import scala .collection .immutable .Queue
53
53
import scala .collection .mutable
@@ -224,18 +224,9 @@ class MemoryQueue(private val etcdClient: EtcdClient,
224
224
logging.info(this , s " [ $invocationNamespace: $action: $stateName] Enable namespace throttling. " )
225
225
enableNamespaceThrottling()
226
226
227
- // if no container could be created, it is same with Flushing state.
228
- if (dropMsg) {
227
+ if (dropMsg)
229
228
completeAllActivations(tooManyConcurrentRequests, isWhiskError = false )
230
- goto(Flushing ) using FlushingData (
231
- data.schedulerActor,
232
- data.droppingActor,
233
- TooManyConcurrentRequests ,
234
- tooManyConcurrentRequests)
235
- } else {
236
- // if there are already some containers running, activations can still be processed so goto the NamespaceThrottled state.
237
- goto(NamespaceThrottled ) using ThrottledData (data.schedulerActor, data.droppingActor)
238
- }
229
+ goto(NamespaceThrottled ) using ThrottledData (data.schedulerActor, data.droppingActor)
239
230
240
231
case Event (StateTimeout , data : RunningData ) =>
241
232
if (queue.isEmpty && (containers.size + creationIds.size) <= 0 ) {
0 commit comments