@@ -35,7 +35,12 @@ import org.apache.openwhisk.core.etcd.EtcdClient
35
35
import org .apache .openwhisk .core .etcd .EtcdKV .ContainerKeys .containerPrefix
36
36
import org .apache .openwhisk .core .etcd .EtcdKV .{ContainerKeys , QueueKeys , ThrottlingKeys }
37
37
import org .apache .openwhisk .core .scheduler .grpc .{GetActivation , ActivationResponse => GetActivationResponse }
38
- import org .apache .openwhisk .core .scheduler .message .{ContainerCreation , ContainerDeletion , FailedCreationJob , SuccessfulCreationJob }
38
+ import org .apache .openwhisk .core .scheduler .message .{
39
+ ContainerCreation ,
40
+ ContainerDeletion ,
41
+ FailedCreationJob ,
42
+ SuccessfulCreationJob
43
+ }
39
44
import org .apache .openwhisk .core .scheduler .{SchedulerEndpoints , SchedulingConfig }
40
45
import org .apache .openwhisk .core .service ._
41
46
import org .apache .openwhisk .http .Messages .{namespaceLimitUnderZero , tooManyConcurrentRequests }
@@ -47,7 +52,7 @@ import scala.annotation.tailrec
47
52
import scala .collection .immutable .Queue
48
53
import scala .collection .mutable
49
54
import scala .concurrent .duration ._
50
- import scala .concurrent .{ExecutionContextExecutor , Future , Promise , duration }
55
+ import scala .concurrent .{duration , ExecutionContextExecutor , Future , Promise }
51
56
import scala .util .{Failure , Success }
52
57
53
58
// States
@@ -453,7 +458,9 @@ class MemoryQueue(private val etcdClient: EtcdClient,
453
458
case `inProgressContainerPrefixKey` =>
454
459
creationIds -= key.split(" /" ).last
455
460
case `existingContainerPrefixKey` =>
456
- containers -= key.split(" /" ).last
461
+ val containerId = key.split(" /" ).last
462
+ removeDeletedContainerFromRequestBuffer(containerId)
463
+ containers -= containerId
457
464
case _ =>
458
465
}
459
466
stay
@@ -499,6 +506,7 @@ class MemoryQueue(private val etcdClient: EtcdClient,
499
506
handleActivationRequest(request)
500
507
} else {
501
508
logging.info(this , s " Remove containerId because ${request.containerId} is not alive " )
509
+ removeDeletedContainerFromRequestBuffer(request.containerId)
502
510
sender ! GetActivationResponse (Left (NoActivationMessage ()))
503
511
containers -= request.containerId
504
512
stay
@@ -766,7 +774,9 @@ class MemoryQueue(private val etcdClient: EtcdClient,
766
774
activation.transid)
767
775
768
776
val totalTimeInScheduler = Interval (activation.transid.meta.start, Instant .now()).duration
769
- MetricEmitter .emitHistogramMetric(LoggingMarkers .SCHEDULER_WAIT_TIME (action.asString), totalTimeInScheduler.toMillis)
777
+ MetricEmitter .emitHistogramMetric(
778
+ LoggingMarkers .SCHEDULER_WAIT_TIME (action.asString),
779
+ totalTimeInScheduler.toMillis)
770
780
771
781
val activationResponse =
772
782
if (isWhiskError)
@@ -931,14 +941,26 @@ class MemoryQueue(private val etcdClient: EtcdClient,
931
941
} else None
932
942
}
933
943
944
+ private def removeDeletedContainerFromRequestBuffer (containerId : String ): Unit = {
945
+ requestBuffer = requestBuffer.filter { buffer =>
946
+ if (buffer.containerId.drop(1 ) == containerId) {
947
+ buffer.promise.trySuccess(Left (NoActivationMessage ()))
948
+ false
949
+ } else
950
+ true
951
+ }
952
+ }
953
+
934
954
private def handleActivationMessage (msg : ActivationMessage ) = {
935
955
logging.info(this , s " [ $invocationNamespace: $action: $stateName] got a new activation message ${msg.activationId}" )(
936
956
msg.transid)
937
957
in.incrementAndGet()
938
958
takeUncompletedRequest()
939
959
.map { res =>
940
960
val totalTimeInScheduler = Interval (msg.transid.meta.start, Instant .now()).duration
941
- MetricEmitter .emitHistogramMetric(LoggingMarkers .SCHEDULER_WAIT_TIME (action.asString), totalTimeInScheduler.toMillis)
961
+ MetricEmitter .emitHistogramMetric(
962
+ LoggingMarkers .SCHEDULER_WAIT_TIME (action.asString),
963
+ totalTimeInScheduler.toMillis)
942
964
res.trySuccess(Right (msg))
943
965
in.decrementAndGet()
944
966
stay
@@ -960,7 +982,9 @@ class MemoryQueue(private val etcdClient: EtcdClient,
960
982
this ,
961
983
s " [ $invocationNamespace: $action: $stateName] Get activation request ${request.containerId}, send one message: ${msg.activationId}" )
962
984
val totalTimeInScheduler = Interval (msg.transid.meta.start, Instant .now()).duration
963
- MetricEmitter .emitHistogramMetric(LoggingMarkers .SCHEDULER_WAIT_TIME (action.asString), totalTimeInScheduler.toMillis)
985
+ MetricEmitter .emitHistogramMetric(
986
+ LoggingMarkers .SCHEDULER_WAIT_TIME (action.asString),
987
+ totalTimeInScheduler.toMillis)
964
988
965
989
sender ! GetActivationResponse (Right (msg))
966
990
tryDisableActionThrottling()
0 commit comments