@@ -46,10 +46,10 @@ import org.apache.openwhisk.http.Messages.{namespaceLimitUnderZero, tooManyConcu
46
46
import pureconfig .loadConfigOrThrow
47
47
import spray .json ._
48
48
import pureconfig .generic .auto ._
49
- import scala .collection .JavaConverters ._
50
49
50
+ import scala .collection .JavaConverters ._
51
51
import java .time .{Duration , Instant }
52
- import java .util .concurrent .atomic .AtomicInteger
52
+ import java .util .concurrent .atomic .{ AtomicInteger , AtomicLong }
53
53
import scala .annotation .tailrec
54
54
import scala .collection .immutable .Queue
55
55
import scala .collection .mutable
@@ -139,6 +139,7 @@ class MemoryQueue(private val etcdClient: EtcdClient,
139
139
checkToDropStaleActivation : (Clock ,
140
140
Queue [TimeSeriesActivationEntry ],
141
141
Long ,
142
+ AtomicLong ,
142
143
String ,
143
144
WhiskActionMetaData ,
144
145
MemoryQueueState ,
@@ -173,6 +174,7 @@ class MemoryQueue(private val etcdClient: EtcdClient,
173
174
174
175
private [queue] var queue = Queue .empty[TimeSeriesActivationEntry ]
175
176
private [queue] var in = new AtomicInteger (0 )
177
+ private [queue] val lastActivationPulledTime = new AtomicLong (Instant .now.toEpochMilli)
176
178
private [queue] val namespaceContainerCount = NamespaceContainerCount (invocationNamespace, etcdClient, watcherService)
177
179
private [queue] var averageDuration : Option [Double ] = None
178
180
private [queue] var averageDurationBuffer = AverageRingBuffer (queueConfig.durationBufferSize)
@@ -574,7 +576,7 @@ class MemoryQueue(private val etcdClient: EtcdClient,
574
576
case Event (DropOld , _) =>
575
577
if (queue.nonEmpty && Duration
576
578
.between(queue.head.timestamp, clock.now())
577
- .compareTo(Duration .ofMillis(actionRetentionTimeout)) < 0 ) {
579
+ .compareTo(Duration .ofMillis(actionRetentionTimeout)) >= 0 ) {
578
580
logging.error(
579
581
this ,
580
582
s " [ $invocationNamespace: $action: $stateName] Drop some stale activations for $revision, existing container is ${containers.size}, inProgress container is ${creationIds.size}, state data: $stateData, in is $in, current: ${queue.size}. " )
@@ -920,6 +922,7 @@ class MemoryQueue(private val etcdClient: EtcdClient,
920
922
clock,
921
923
queue,
922
924
actionRetentionTimeout,
925
+ lastActivationPulledTime,
923
926
invocationNamespace,
924
927
actionMetaData,
925
928
stateName,
@@ -1024,6 +1027,7 @@ class MemoryQueue(private val etcdClient: EtcdClient,
1024
1027
MetricEmitter .emitHistogramMetric(
1025
1028
LoggingMarkers .SCHEDULER_WAIT_TIME (action.asString, action.toStringWithoutVersion),
1026
1029
totalTimeInScheduler.toMillis)
1030
+ lastActivationPulledTime.set(Instant .now.toEpochMilli)
1027
1031
res.trySuccess(Right (msg))
1028
1032
in.decrementAndGet()
1029
1033
stay
@@ -1049,6 +1053,7 @@ class MemoryQueue(private val etcdClient: EtcdClient,
1049
1053
MetricEmitter .emitHistogramMetric(
1050
1054
LoggingMarkers .SCHEDULER_WAIT_TIME (action.asString, action.toStringWithoutVersion),
1051
1055
totalTimeInScheduler.toMillis)
1056
+ lastActivationPulledTime.set(Instant .now.toEpochMilli)
1052
1057
1053
1058
sender ! GetActivationResponse (Right (msg))
1054
1059
tryDisableActionThrottling()
@@ -1186,6 +1191,7 @@ object MemoryQueue {
1186
1191
def checkToDropStaleActivation (clock : Clock ,
1187
1192
queue : Queue [TimeSeriesActivationEntry ],
1188
1193
maxRetentionMs : Long ,
1194
+ lastActivationExecutedTime : AtomicLong ,
1189
1195
invocationNamespace : String ,
1190
1196
actionMetaData : WhiskActionMetaData ,
1191
1197
stateName : MemoryQueueState ,
@@ -1201,6 +1207,13 @@ object MemoryQueue {
1201
1207
logging.info(
1202
1208
this ,
1203
1209
s " [ $invocationNamespace: $action: $stateName] some activations are stale msg: ${queue.head.msg.activationId}. " )
1210
+ val timeSinceLastActivationGrabbed = clock.now().toEpochMilli - lastActivationExecutedTime.get()
1211
+ if (timeSinceLastActivationGrabbed > maxRetentionMs && timeSinceLastActivationGrabbed > actionMetaData.limits.timeout.millis) {
1212
+ MetricEmitter .emitGaugeMetric(
1213
+ LoggingMarkers
1214
+ .SCHEDULER_QUEUE_NOT_PROCESSING (invocationNamespace, action.asString, action.toStringWithoutVersion),
1215
+ 1 )
1216
+ }
1204
1217
1205
1218
queueRef ! DropOld
1206
1219
}
0 commit comments