diff --git a/common/scala/src/main/scala/org/apache/openwhisk/common/Logging.scala b/common/scala/src/main/scala/org/apache/openwhisk/common/Logging.scala index 5a1fe89f0f0..c0dbb783af6 100644 --- a/common/scala/src/main/scala/org/apache/openwhisk/common/Logging.scala +++ b/common/scala/src/main/scala/org/apache/openwhisk/common/Logging.scala @@ -591,8 +591,9 @@ object LoggingMarkers { // Time that is needed to produce message in kafka val SCHEDULER_KAFKA = LogMarkerToken(scheduler, kafka, start)(MeasurementUnit.time.milliseconds) - val SCHEDULER_WAIT_TIME = - LogMarkerToken(scheduler, "waitTime", counter)(MeasurementUnit.none) + val SCHEDULER_KAFKA_WAIT_TIME = + LogMarkerToken(scheduler, "kafkaWaitTime", counter)(MeasurementUnit.none) + def SCHEDULER_WAIT_TIME(action: String) = LogMarkerToken(scheduler, "waitTime", counter, Some(action), Map("action" -> action))(MeasurementUnit.none) def SCHEDULER_KEEP_ALIVE(leaseId: Long) = LogMarkerToken(scheduler, "keepAlive", counter, None, Map("leaseId" -> leaseId.toString))(MeasurementUnit.none) @@ -603,6 +604,7 @@ object LoggingMarkers { LogMarkerToken(scheduler, "queueUpdate", counter, None, Map("reason" -> reason))(MeasurementUnit.none) def SCHEDULER_QUEUE_WAITING_ACTIVATION(action: String) = LogMarkerToken(scheduler, "queueActivation", counter, Some(action), Map("action" -> action))(MeasurementUnit.none) + /* * General markers */ diff --git a/core/scheduler/src/main/scala/org/apache/openwhisk/core/scheduler/queue/MemoryQueue.scala b/core/scheduler/src/main/scala/org/apache/openwhisk/core/scheduler/queue/MemoryQueue.scala index d54e697e284..51a674457d6 100644 --- a/core/scheduler/src/main/scala/org/apache/openwhisk/core/scheduler/queue/MemoryQueue.scala +++ b/core/scheduler/src/main/scala/org/apache/openwhisk/core/scheduler/queue/MemoryQueue.scala @@ -19,7 +19,6 @@ package org.apache.openwhisk.core.scheduler.queue import java.time.{Duration, Instant} import java.util.concurrent.atomic.AtomicInteger - import akka.actor.Status.{Failure => FailureMessage} import akka.actor.{ActorRef, ActorSystem, Cancellable, FSM, Props, Stash} import akka.util.Timeout @@ -28,6 +27,7 @@ import org.apache.openwhisk.core.ConfigKeys import org.apache.openwhisk.core.ack.ActiveAck import org.apache.openwhisk.core.connector.ContainerCreationError.{TooManyConcurrentRequests, ZeroNamespaceLimit} import org.apache.openwhisk.core.connector._ +import org.apache.openwhisk.core.containerpool.Interval import org.apache.openwhisk.core.database.{NoDocumentException, UserContext} import org.apache.openwhisk.core.entity._ import org.apache.openwhisk.core.entity.size._ @@ -35,12 +35,7 @@ import org.apache.openwhisk.core.etcd.EtcdClient import org.apache.openwhisk.core.etcd.EtcdKV.ContainerKeys.containerPrefix import org.apache.openwhisk.core.etcd.EtcdKV.{ContainerKeys, QueueKeys, ThrottlingKeys} import org.apache.openwhisk.core.scheduler.grpc.{GetActivation, ActivationResponse => GetActivationResponse} -import org.apache.openwhisk.core.scheduler.message.{ - ContainerCreation, - ContainerDeletion, - FailedCreationJob, - SuccessfulCreationJob -} +import org.apache.openwhisk.core.scheduler.message.{ContainerCreation, ContainerDeletion, FailedCreationJob, SuccessfulCreationJob} import org.apache.openwhisk.core.scheduler.{SchedulerEndpoints, SchedulingConfig} import org.apache.openwhisk.core.service._ import org.apache.openwhisk.http.Messages.{namespaceLimitUnderZero, tooManyConcurrentRequests} @@ -52,7 +47,7 @@ import scala.annotation.tailrec import scala.collection.immutable.Queue import scala.collection.mutable import scala.concurrent.duration._ -import scala.concurrent.{duration, ExecutionContextExecutor, Future, Promise} +import scala.concurrent.{ExecutionContextExecutor, Future, Promise, duration} import scala.util.{Failure, Success} // States @@ -769,6 +764,10 @@ class MemoryQueue(private val etcdClient: EtcdClient, this, s"[$invocationNamespace:$action:$stateName] complete activation ${activation.activationId} with error $message")( activation.transid) + + val totalTimeInScheduler = Interval(activation.transid.meta.start, Instant.now()).duration + MetricEmitter.emitHistogramMetric(LoggingMarkers.SCHEDULER_WAIT_TIME(action.asString), totalTimeInScheduler.toMillis) + val activationResponse = if (isWhiskError) generateFallbackActivation(activation, ActivationResponse.whiskError(message)) @@ -938,6 +937,8 @@ class MemoryQueue(private val etcdClient: EtcdClient, in.incrementAndGet() takeUncompletedRequest() .map { res => + val totalTimeInScheduler = Interval(msg.transid.meta.start, Instant.now()).duration + MetricEmitter.emitHistogramMetric(LoggingMarkers.SCHEDULER_WAIT_TIME(action.asString), totalTimeInScheduler.toMillis) res.trySuccess(Right(msg)) in.decrementAndGet() stay @@ -958,6 +959,9 @@ class MemoryQueue(private val etcdClient: EtcdClient, logging.info( this, s"[$invocationNamespace:$action:$stateName] Get activation request ${request.containerId}, send one message: ${msg.activationId}") + val totalTimeInScheduler = Interval(msg.transid.meta.start, Instant.now()).duration + MetricEmitter.emitHistogramMetric(LoggingMarkers.SCHEDULER_WAIT_TIME(action.asString), totalTimeInScheduler.toMillis) + sender ! GetActivationResponse(Right(msg)) tryDisableActionThrottling() } else { diff --git a/core/scheduler/src/main/scala/org/apache/openwhisk/core/scheduler/queue/QueueManager.scala b/core/scheduler/src/main/scala/org/apache/openwhisk/core/scheduler/queue/QueueManager.scala index 6dbaffb0bbb..5178d2a3fb6 100644 --- a/core/scheduler/src/main/scala/org/apache/openwhisk/core/scheduler/queue/QueueManager.scala +++ b/core/scheduler/src/main/scala/org/apache/openwhisk/core/scheduler/queue/QueueManager.scala @@ -391,7 +391,7 @@ class QueueManager( // Drop the message that has not been scheduled for a long time val schedulingWaitTime = Interval(msg.transid.meta.start, Instant.now()).duration - MetricEmitter.emitHistogramMetric(LoggingMarkers.SCHEDULER_WAIT_TIME, schedulingWaitTime.toMillis) + MetricEmitter.emitHistogramMetric(LoggingMarkers.SCHEDULER_KAFKA_WAIT_TIME, schedulingWaitTime.toMillis) if (schedulingWaitTime > queueManagerConfig.maxSchedulingTime) { logging.warn(