Skip to content

add fpc load balancer metrics #5240

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
May 24, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -568,6 +568,15 @@ object LoggingMarkers {
val OFFLINE_INVOKER_BLACKBOX =
LogMarkerToken(loadbalancer, "totalOfflineInvokerBlackBox", counter)(MeasurementUnit.none)

val HEALTHY_INVOKERS =
LogMarkerToken(loadbalancer, "totalHealthyInvoker", counter)(MeasurementUnit.none)
val UNHEALTHY_INVOKERS =
LogMarkerToken(loadbalancer, "totalUnhealthyInvoker", counter)(MeasurementUnit.none)
val OFFLINE_INVOKERS =
LogMarkerToken(loadbalancer, "totalOfflineInvoker", counter)(MeasurementUnit.none)

val INVOKER_TOTALMEM = LogMarkerToken(loadbalancer, "totalCapacity", counter)(MeasurementUnit.none)

// Kafka related markers
def KAFKA_QUEUE(topic: String) =
if (TransactionId.metricsKamonTags)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,13 +3,13 @@ package org.apache.openwhisk.core.loadBalancer
import java.nio.charset.StandardCharsets
import java.util.concurrent.ThreadLocalRandom
import java.util.concurrent.atomic.LongAdder

import akka.actor.{Actor, ActorRef, ActorRefFactory, ActorSystem, Cancellable, Props}
import akka.event.Logging.InfoLevel
import akka.pattern.ask
import akka.util.Timeout
import org.apache.kafka.clients.producer.RecordMetadata
import org.apache.openwhisk.common.InvokerState.{Healthy, Offline, Unhealthy}
import org.apache.openwhisk.common.LoggingMarkers._
import org.apache.openwhisk.common._
import org.apache.openwhisk.core.connector._
import org.apache.openwhisk.core.controller.Controller
Expand Down Expand Up @@ -336,6 +336,16 @@ class FPCPoolBalancer(config: WhiskConfig,
}
}

// Singletons for counter metrics related to completion acks
protected val LOADBALANCER_COMPLETION_ACK_REGULAR =
LoggingMarkers.LOADBALANCER_COMPLETION_ACK(controllerInstance, RegularCompletionAck)
protected val LOADBALANCER_COMPLETION_ACK_FORCED =
LoggingMarkers.LOADBALANCER_COMPLETION_ACK(controllerInstance, ForcedCompletionAck)
protected val LOADBALANCER_COMPLETION_ACK_REGULAR_AFTER_FORCED =
LoggingMarkers.LOADBALANCER_COMPLETION_ACK(controllerInstance, RegularAfterForcedCompletionAck)
protected val LOADBALANCER_COMPLETION_ACK_FORCED_AFTER_REGULAR =
LoggingMarkers.LOADBALANCER_COMPLETION_ACK(controllerInstance, ForcedAfterRegularCompletionAck)

/** Process the completion ack and update the state */
protected[loadBalancer] def processCompletion(aid: ActivationId,
tid: TransactionId,
Expand All @@ -360,8 +370,10 @@ class FPCPoolBalancer(config: WhiskConfig,
// the active ack is received as expected, and processing that message removed the promise
// from the corresponding map
logging.info(this, s"received completion ack for '$aid', system error=$isSystemError")(tid)
MetricEmitter.emitCounterMetric(LOADBALANCER_COMPLETION_ACK_REGULAR)
} else {
logging.error(this, s"Failed to invoke action ${aid.toString}, error: timeout waiting for the active ack")
MetricEmitter.emitCounterMetric(LOADBALANCER_COMPLETION_ACK_FORCED)

// the entry has timed out; if the active ack is still around, remove its entry also
// and complete the promise with a failure if necessary
Expand All @@ -379,11 +391,13 @@ class FPCPoolBalancer(config: WhiskConfig,
// Logging this condition as a warning because the invoker processed the activation and sent a completion
// message - but not in time.
logging.warn(this, s"received completion ack for '$aid' which has no entry, system error=$isSystemError")(tid)
MetricEmitter.emitCounterMetric(LOADBALANCER_COMPLETION_ACK_REGULAR_AFTER_FORCED)
case None =>
// The entry has already been removed by a completion ack. This part of the code is reached by the timeout and can
// happen if completion ack and timeout happen roughly at the same time (the timeout was triggered before the completion
// ack canceled the timer). As the completion ack is already processed we don't have to do anything here.
logging.debug(this, s"forced completion ack for '$aid' which has no entry")(tid)
MetricEmitter.emitCounterMetric(LOADBALANCER_COMPLETION_ACK_FORCED_AFTER_REGULAR)
}
}

Expand Down Expand Up @@ -601,6 +615,28 @@ class FPCPoolBalancer(config: WhiskConfig,
}
}

def emitMetrics() = {
invokerHealth().map(invokers => {
MetricEmitter.emitGaugeMetric(HEALTHY_INVOKERS, invokers.count(_.status == Healthy))
MetricEmitter.emitGaugeMetric(UNHEALTHY_INVOKERS, invokers.count(_.status == Unhealthy))
MetricEmitter.emitGaugeMetric(OFFLINE_INVOKERS, invokers.count(_.status == Offline))
// Add both user memory and busy memory because user memory represents free memory in this case
MetricEmitter.emitGaugeMetric(
INVOKER_TOTALMEM,
invokers.foldLeft(0L) { (total, curr) =>
if (curr.status.isUsable) {
curr.id.userMemory.toMB + curr.id.busyMemory.getOrElse(ByteSize(0, SizeUnits.BYTE)).toMB + total
} else {
total
}
})
MetricEmitter.emitGaugeMetric(LOADBALANCER_ACTIVATIONS_INFLIGHT(controllerInstance), totalActivations.longValue)
MetricEmitter.emitGaugeMetric(LOADBALANCER_MEMORY_INFLIGHT(controllerInstance, ""), totalActivationMemory.longValue)
})
}

actorSystem.scheduler.scheduleAtFixedRate(10.seconds, 10.seconds)(() => emitMetrics())

/** Gets the number of in-flight activations for a specific user. */
override def activeActivationsFor(namespace: UUID): Future[Int] =
Future.successful(activationsPerNamespace.get(namespace).map(_.intValue()).getOrElse(0))
Expand Down