-
Notifications
You must be signed in to change notification settings - Fork 1.2k
Add Scheduler Queue Metric for Not Processing Any Activations #5386
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from 2 commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -46,10 +46,10 @@ import org.apache.openwhisk.http.Messages.{namespaceLimitUnderZero, tooManyConcu | |
import pureconfig.loadConfigOrThrow | ||
import spray.json._ | ||
import pureconfig.generic.auto._ | ||
import scala.collection.JavaConverters._ | ||
|
||
import scala.collection.JavaConverters._ | ||
import java.time.{Duration, Instant} | ||
import java.util.concurrent.atomic.AtomicInteger | ||
import java.util.concurrent.atomic.{AtomicInteger, AtomicLong} | ||
import scala.annotation.tailrec | ||
import scala.collection.immutable.Queue | ||
import scala.collection.mutable | ||
|
@@ -139,6 +139,7 @@ class MemoryQueue(private val etcdClient: EtcdClient, | |
checkToDropStaleActivation: (Clock, | ||
Queue[TimeSeriesActivationEntry], | ||
Long, | ||
AtomicLong, | ||
String, | ||
WhiskActionMetaData, | ||
MemoryQueueState, | ||
|
@@ -173,6 +174,7 @@ class MemoryQueue(private val etcdClient: EtcdClient, | |
|
||
private[queue] var queue = Queue.empty[TimeSeriesActivationEntry] | ||
private[queue] var in = new AtomicInteger(0) | ||
private[queue] val lastActivationPulledTime = new AtomicLong(Instant.now.toEpochMilli) | ||
private[queue] val namespaceContainerCount = NamespaceContainerCount(invocationNamespace, etcdClient, watcherService) | ||
private[queue] var averageDuration: Option[Double] = None | ||
private[queue] var averageDurationBuffer = AverageRingBuffer(queueConfig.durationBufferSize) | ||
|
@@ -574,7 +576,7 @@ class MemoryQueue(private val etcdClient: EtcdClient, | |
case Event(DropOld, _) => | ||
if (queue.nonEmpty && Duration | ||
.between(queue.head.timestamp, clock.now()) | ||
.compareTo(Duration.ofMillis(actionRetentionTimeout)) < 0) { | ||
.compareTo(Duration.ofMillis(actionRetentionTimeout)) >= 0) { | ||
logging.error( | ||
this, | ||
s"[$invocationNamespace:$action:$stateName] Drop some stale activations for $revision, existing container is ${containers.size}, inProgress container is ${creationIds.size}, state data: $stateData, in is $in, current: ${queue.size}.") | ||
|
@@ -920,6 +922,7 @@ class MemoryQueue(private val etcdClient: EtcdClient, | |
clock, | ||
queue, | ||
actionRetentionTimeout, | ||
lastActivationPulledTime, | ||
invocationNamespace, | ||
actionMetaData, | ||
stateName, | ||
|
@@ -1024,6 +1027,7 @@ class MemoryQueue(private val etcdClient: EtcdClient, | |
MetricEmitter.emitHistogramMetric( | ||
LoggingMarkers.SCHEDULER_WAIT_TIME(action.asString, action.toStringWithoutVersion), | ||
totalTimeInScheduler.toMillis) | ||
lastActivationPulledTime.set(Instant.now.toEpochMilli) | ||
res.trySuccess(Right(msg)) | ||
in.decrementAndGet() | ||
stay | ||
|
@@ -1049,6 +1053,7 @@ class MemoryQueue(private val etcdClient: EtcdClient, | |
MetricEmitter.emitHistogramMetric( | ||
LoggingMarkers.SCHEDULER_WAIT_TIME(action.asString, action.toStringWithoutVersion), | ||
totalTimeInScheduler.toMillis) | ||
lastActivationPulledTime.set(Instant.now.toEpochMilli) | ||
|
||
sender ! GetActivationResponse(Right(msg)) | ||
tryDisableActionThrottling() | ||
|
@@ -1186,6 +1191,7 @@ object MemoryQueue { | |
def checkToDropStaleActivation(clock: Clock, | ||
queue: Queue[TimeSeriesActivationEntry], | ||
maxRetentionMs: Long, | ||
lastActivationExecutedTime: AtomicLong, | ||
invocationNamespace: String, | ||
actionMetaData: WhiskActionMetaData, | ||
stateName: MemoryQueueState, | ||
|
@@ -1201,6 +1207,13 @@ object MemoryQueue { | |
logging.info( | ||
this, | ||
s"[$invocationNamespace:$action:$stateName] some activations are stale msg: ${queue.head.msg.activationId}.") | ||
val currentTime = Instant.now.toEpochMilli | ||
if (currentTime - lastActivationExecutedTime.get() > maxRetentionMs) { | ||
MetricEmitter.emitGaugeMetric( | ||
LoggingMarkers | ||
.SCHEDULER_QUEUE_NOT_PROCESSING(invocationNamespace, action.asString, action.toStringWithoutVersion), | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I agree with this PR. Do we also need to collect the counter of activation requests that actually got activations? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. For this metric, it should not fire for the case where an activation request does not arrive because there are not enough containers to adequately meet the throughput of how many activations are in the queue. In such a case some activations should be making progress in which case it won't pass the check that the last activation executed time is greater than the retention timeout of the queue. The value of the metric is thus a boolean 1 or 0, either the action is having problems or it's not. I can try to think of if there's any additional data we should be emitting when hitting the case, but in my comment above I think I also have a bug fix where a log emitting the state of the queue isn't properly getting logged when activations are timed out. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. But if the execution time of an action is bigger than the threshold, and there are not enough containers, doesn't it fire the metric? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Right I noted that in the description
I could also look into making it such that the queue retention time is dynamic to the action timeout of the function There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Okay I added an additional check that the time since last activation executed must also be greater than the action timeout since I think that's a fair case if the system is configured to support long running activations, you may still not want the queue retention time to also be that high |
||
1) | ||
} | ||
|
||
queueRef ! DropOld | ||
} | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Based off of the same comparison in the
checkToDropStaleActivation
function, I think this is actually a bug such that the error log below this will never get emitted in order to debug the container state of the queue when the not processing case occurs. Could use sanity check