Skip to content

Commit 60ca660

Browse files
bdoyle0182Brendan Doyle
andauthored
Add Scheduler Queue Metric for Not Processing Any Activations (#5386)
* Add Scheduler Queue Metric for Not Processing Any Activations * fix timeout comparison * account for action timeout being longer than queue retention --------- Co-authored-by: Brendan Doyle <[email protected]>
1 parent 96ff327 commit 60ca660

File tree

2 files changed

+23
-3
lines changed
  • common/scala/src/main/scala/org/apache/openwhisk/common
  • core/scheduler/src/main/scala/org/apache/openwhisk/core/scheduler/queue

2 files changed

+23
-3
lines changed

common/scala/src/main/scala/org/apache/openwhisk/common/Logging.scala

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -612,6 +612,13 @@ object LoggingMarkers {
612612
counter,
613613
Some(actionWithoutVersion),
614614
Map("namespace" -> namespace, "action" -> actionWithVersion))(MeasurementUnit.none)
615+
def SCHEDULER_QUEUE_NOT_PROCESSING(namespace: String, actionWithVersion: String, actionWithoutVersion: String) =
616+
LogMarkerToken(
617+
scheduler,
618+
"queueNotProcessing",
619+
counter,
620+
Some(actionWithoutVersion),
621+
Map("namespace" -> namespace, "action" -> actionWithVersion))(MeasurementUnit.none)
615622

616623
/*
617624
* General markers

core/scheduler/src/main/scala/org/apache/openwhisk/core/scheduler/queue/MemoryQueue.scala

Lines changed: 16 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -46,10 +46,10 @@ import org.apache.openwhisk.http.Messages.{namespaceLimitUnderZero, tooManyConcu
4646
import pureconfig.loadConfigOrThrow
4747
import spray.json._
4848
import pureconfig.generic.auto._
49-
import scala.collection.JavaConverters._
5049

50+
import scala.collection.JavaConverters._
5151
import java.time.{Duration, Instant}
52-
import java.util.concurrent.atomic.AtomicInteger
52+
import java.util.concurrent.atomic.{AtomicInteger, AtomicLong}
5353
import scala.annotation.tailrec
5454
import scala.collection.immutable.Queue
5555
import scala.collection.mutable
@@ -139,6 +139,7 @@ class MemoryQueue(private val etcdClient: EtcdClient,
139139
checkToDropStaleActivation: (Clock,
140140
Queue[TimeSeriesActivationEntry],
141141
Long,
142+
AtomicLong,
142143
String,
143144
WhiskActionMetaData,
144145
MemoryQueueState,
@@ -173,6 +174,7 @@ class MemoryQueue(private val etcdClient: EtcdClient,
173174

174175
private[queue] var queue = Queue.empty[TimeSeriesActivationEntry]
175176
private[queue] var in = new AtomicInteger(0)
177+
private[queue] val lastActivationPulledTime = new AtomicLong(Instant.now.toEpochMilli)
176178
private[queue] val namespaceContainerCount = NamespaceContainerCount(invocationNamespace, etcdClient, watcherService)
177179
private[queue] var averageDuration: Option[Double] = None
178180
private[queue] var averageDurationBuffer = AverageRingBuffer(queueConfig.durationBufferSize)
@@ -574,7 +576,7 @@ class MemoryQueue(private val etcdClient: EtcdClient,
574576
case Event(DropOld, _) =>
575577
if (queue.nonEmpty && Duration
576578
.between(queue.head.timestamp, clock.now())
577-
.compareTo(Duration.ofMillis(actionRetentionTimeout)) < 0) {
579+
.compareTo(Duration.ofMillis(actionRetentionTimeout)) >= 0) {
578580
logging.error(
579581
this,
580582
s"[$invocationNamespace:$action:$stateName] Drop some stale activations for $revision, existing container is ${containers.size}, inProgress container is ${creationIds.size}, state data: $stateData, in is $in, current: ${queue.size}.")
@@ -920,6 +922,7 @@ class MemoryQueue(private val etcdClient: EtcdClient,
920922
clock,
921923
queue,
922924
actionRetentionTimeout,
925+
lastActivationPulledTime,
923926
invocationNamespace,
924927
actionMetaData,
925928
stateName,
@@ -1024,6 +1027,7 @@ class MemoryQueue(private val etcdClient: EtcdClient,
10241027
MetricEmitter.emitHistogramMetric(
10251028
LoggingMarkers.SCHEDULER_WAIT_TIME(action.asString, action.toStringWithoutVersion),
10261029
totalTimeInScheduler.toMillis)
1030+
lastActivationPulledTime.set(Instant.now.toEpochMilli)
10271031
res.trySuccess(Right(msg))
10281032
in.decrementAndGet()
10291033
stay
@@ -1049,6 +1053,7 @@ class MemoryQueue(private val etcdClient: EtcdClient,
10491053
MetricEmitter.emitHistogramMetric(
10501054
LoggingMarkers.SCHEDULER_WAIT_TIME(action.asString, action.toStringWithoutVersion),
10511055
totalTimeInScheduler.toMillis)
1056+
lastActivationPulledTime.set(Instant.now.toEpochMilli)
10521057

10531058
sender ! GetActivationResponse(Right(msg))
10541059
tryDisableActionThrottling()
@@ -1186,6 +1191,7 @@ object MemoryQueue {
11861191
def checkToDropStaleActivation(clock: Clock,
11871192
queue: Queue[TimeSeriesActivationEntry],
11881193
maxRetentionMs: Long,
1194+
lastActivationExecutedTime: AtomicLong,
11891195
invocationNamespace: String,
11901196
actionMetaData: WhiskActionMetaData,
11911197
stateName: MemoryQueueState,
@@ -1201,6 +1207,13 @@ object MemoryQueue {
12011207
logging.info(
12021208
this,
12031209
s"[$invocationNamespace:$action:$stateName] some activations are stale msg: ${queue.head.msg.activationId}.")
1210+
val timeSinceLastActivationGrabbed = clock.now().toEpochMilli - lastActivationExecutedTime.get()
1211+
if (timeSinceLastActivationGrabbed > maxRetentionMs && timeSinceLastActivationGrabbed > actionMetaData.limits.timeout.millis) {
1212+
MetricEmitter.emitGaugeMetric(
1213+
LoggingMarkers
1214+
.SCHEDULER_QUEUE_NOT_PROCESSING(invocationNamespace, action.asString, action.toStringWithoutVersion),
1215+
1)
1216+
}
12041217

12051218
queueRef ! DropOld
12061219
}

0 commit comments

Comments
 (0)