Skip to content

Add Scheduler Queue Metric for Not Processing Any Activations #5386

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 3 commits into from
Mar 7, 2023
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -612,6 +612,13 @@ object LoggingMarkers {
counter,
Some(actionWithoutVersion),
Map("namespace" -> namespace, "action" -> actionWithVersion))(MeasurementUnit.none)
def SCHEDULER_QUEUE_NOT_PROCESSING(namespace: String, actionWithVersion: String, actionWithoutVersion: String) =
LogMarkerToken(
scheduler,
"queueNotProcessing",
counter,
Some(actionWithoutVersion),
Map("namespace" -> namespace, "action" -> actionWithVersion))(MeasurementUnit.none)

/*
* General markers
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,10 +46,10 @@ import org.apache.openwhisk.http.Messages.{namespaceLimitUnderZero, tooManyConcu
import pureconfig.loadConfigOrThrow
import spray.json._
import pureconfig.generic.auto._
import scala.collection.JavaConverters._

import scala.collection.JavaConverters._
import java.time.{Duration, Instant}
import java.util.concurrent.atomic.AtomicInteger
import java.util.concurrent.atomic.{AtomicInteger, AtomicLong}
import scala.annotation.tailrec
import scala.collection.immutable.Queue
import scala.collection.mutable
Expand Down Expand Up @@ -139,6 +139,7 @@ class MemoryQueue(private val etcdClient: EtcdClient,
checkToDropStaleActivation: (Clock,
Queue[TimeSeriesActivationEntry],
Long,
AtomicLong,
String,
WhiskActionMetaData,
MemoryQueueState,
Expand Down Expand Up @@ -173,6 +174,7 @@ class MemoryQueue(private val etcdClient: EtcdClient,

private[queue] var queue = Queue.empty[TimeSeriesActivationEntry]
private[queue] var in = new AtomicInteger(0)
private[queue] val lastActivationPulledTime = new AtomicLong(Instant.now.toEpochMilli)
private[queue] val namespaceContainerCount = NamespaceContainerCount(invocationNamespace, etcdClient, watcherService)
private[queue] var averageDuration: Option[Double] = None
private[queue] var averageDurationBuffer = AverageRingBuffer(queueConfig.durationBufferSize)
Expand Down Expand Up @@ -574,7 +576,7 @@ class MemoryQueue(private val etcdClient: EtcdClient,
case Event(DropOld, _) =>
if (queue.nonEmpty && Duration
.between(queue.head.timestamp, clock.now())
.compareTo(Duration.ofMillis(actionRetentionTimeout)) < 0) {
.compareTo(Duration.ofMillis(actionRetentionTimeout)) >= 0) {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Based off of the same comparison in the checkToDropStaleActivation function, I think this is actually a bug such that the error log below this will never get emitted in order to debug the container state of the queue when the not processing case occurs. Could use sanity check

logging.error(
this,
s"[$invocationNamespace:$action:$stateName] Drop some stale activations for $revision, existing container is ${containers.size}, inProgress container is ${creationIds.size}, state data: $stateData, in is $in, current: ${queue.size}.")
Expand Down Expand Up @@ -920,6 +922,7 @@ class MemoryQueue(private val etcdClient: EtcdClient,
clock,
queue,
actionRetentionTimeout,
lastActivationPulledTime,
invocationNamespace,
actionMetaData,
stateName,
Expand Down Expand Up @@ -1024,6 +1027,7 @@ class MemoryQueue(private val etcdClient: EtcdClient,
MetricEmitter.emitHistogramMetric(
LoggingMarkers.SCHEDULER_WAIT_TIME(action.asString, action.toStringWithoutVersion),
totalTimeInScheduler.toMillis)
lastActivationPulledTime.set(Instant.now.toEpochMilli)
res.trySuccess(Right(msg))
in.decrementAndGet()
stay
Expand All @@ -1049,6 +1053,7 @@ class MemoryQueue(private val etcdClient: EtcdClient,
MetricEmitter.emitHistogramMetric(
LoggingMarkers.SCHEDULER_WAIT_TIME(action.asString, action.toStringWithoutVersion),
totalTimeInScheduler.toMillis)
lastActivationPulledTime.set(Instant.now.toEpochMilli)

sender ! GetActivationResponse(Right(msg))
tryDisableActionThrottling()
Expand Down Expand Up @@ -1186,6 +1191,7 @@ object MemoryQueue {
def checkToDropStaleActivation(clock: Clock,
queue: Queue[TimeSeriesActivationEntry],
maxRetentionMs: Long,
lastActivationExecutedTime: AtomicLong,
invocationNamespace: String,
actionMetaData: WhiskActionMetaData,
stateName: MemoryQueueState,
Expand All @@ -1201,6 +1207,13 @@ object MemoryQueue {
logging.info(
this,
s"[$invocationNamespace:$action:$stateName] some activations are stale msg: ${queue.head.msg.activationId}.")
val currentTime = Instant.now.toEpochMilli
if (currentTime - lastActivationExecutedTime.get() > maxRetentionMs) {
MetricEmitter.emitGaugeMetric(
LoggingMarkers
.SCHEDULER_QUEUE_NOT_PROCESSING(invocationNamespace, action.asString, action.toStringWithoutVersion),
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree with this PR.
One question is if we collect the counter metrics here, what kinds of information we can get?
Can we differentiate the case where an activation request does not arrive because there are not enough containers and the case where dangling containers make the system cannot handle activations?

Do we also need to collect the counter of activation requests that actually got activations?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For this metric, it should not fire for the case where an activation request does not arrive because there are not enough containers to adequately meet the throughput of how many activations are in the queue. In such a case some activations should be making progress in which case it won't pass the check that the last activation executed time is greater than the retention timeout of the queue. The value of the metric is thus a boolean 1 or 0, either the action is having problems or it's not.

I can try to think of if there's any additional data we should be emitting when hitting the case, but in my comment above I think I also have a bug fix where a log emitting the state of the queue isn't properly getting logged when activations are timed out.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

But if the execution time of an action is bigger than the threshold, and there are not enough containers, doesn't it fire the metric?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Right I noted that in the description

The system configured queue retention timeout is not longer than the max timeout of an action such that all available containers are in use up to the limit and are validly in use longer than the max retention timeout of the queue and the system operator should take action anyways to correct that.

I could also look into making it such that the queue retention time is dynamic to the action timeout of the function

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Okay I added an additional check that the time since last activation executed must also be greater than the action timeout since I think that's a fair case if the system is configured to support long running activations, you may still not want the queue retention time to also be that high

1)
}

queueRef ! DropOld
}
Expand Down