@@ -3,12 +3,12 @@ package org.apache.openwhisk.core.loadBalancer
3
3
import java .nio .charset .StandardCharsets
4
4
import java .util .concurrent .ThreadLocalRandom
5
5
import java .util .concurrent .atomic .LongAdder
6
-
7
6
import akka .actor .{Actor , ActorRef , ActorRefFactory , ActorSystem , Cancellable , Props }
8
7
import akka .event .Logging .InfoLevel
9
8
import akka .pattern .ask
10
9
import akka .util .Timeout
11
10
import org .apache .openwhisk .common .InvokerState .{Healthy , Offline , Unhealthy }
11
+ import org .apache .openwhisk .common .LoggingMarkers ._
12
12
import org .apache .openwhisk .common ._
13
13
import org .apache .openwhisk .core .connector ._
14
14
import org .apache .openwhisk .core .controller .Controller
@@ -335,6 +335,16 @@ class FPCPoolBalancer(config: WhiskConfig,
335
335
}
336
336
}
337
337
338
+ // Singletons for counter metrics related to completion acks
339
+ protected val LOADBALANCER_COMPLETION_ACK_REGULAR =
340
+ LoggingMarkers .LOADBALANCER_COMPLETION_ACK (controllerInstance, RegularCompletionAck )
341
+ protected val LOADBALANCER_COMPLETION_ACK_FORCED =
342
+ LoggingMarkers .LOADBALANCER_COMPLETION_ACK (controllerInstance, ForcedCompletionAck )
343
+ protected val LOADBALANCER_COMPLETION_ACK_REGULAR_AFTER_FORCED =
344
+ LoggingMarkers .LOADBALANCER_COMPLETION_ACK (controllerInstance, RegularAfterForcedCompletionAck )
345
+ protected val LOADBALANCER_COMPLETION_ACK_FORCED_AFTER_REGULAR =
346
+ LoggingMarkers .LOADBALANCER_COMPLETION_ACK (controllerInstance, ForcedAfterRegularCompletionAck )
347
+
338
348
/** Process the completion ack and update the state */
339
349
protected [loadBalancer] def processCompletion (aid : ActivationId ,
340
350
tid : TransactionId ,
@@ -359,8 +369,10 @@ class FPCPoolBalancer(config: WhiskConfig,
359
369
// the active ack is received as expected, and processing that message removed the promise
360
370
// from the corresponding map
361
371
logging.info(this , s " received completion ack for ' $aid', system error= $isSystemError" )(tid)
372
+ MetricEmitter .emitCounterMetric(LOADBALANCER_COMPLETION_ACK_REGULAR )
362
373
} else {
363
374
logging.error(this , s " Failed to invoke action ${aid.toString}, error: timeout waiting for the active ack " )
375
+ MetricEmitter .emitCounterMetric(LOADBALANCER_COMPLETION_ACK_FORCED )
364
376
365
377
// the entry has timed out; if the active ack is still around, remove its entry also
366
378
// and complete the promise with a failure if necessary
@@ -378,11 +390,13 @@ class FPCPoolBalancer(config: WhiskConfig,
378
390
// Logging this condition as a warning because the invoker processed the activation and sent a completion
379
391
// message - but not in time.
380
392
logging.warn(this , s " received completion ack for ' $aid' which has no entry, system error= $isSystemError" )(tid)
393
+ MetricEmitter .emitCounterMetric(LOADBALANCER_COMPLETION_ACK_REGULAR_AFTER_FORCED )
381
394
case None =>
382
395
// The entry has already been removed by a completion ack. This part of the code is reached by the timeout and can
383
396
// happen if completion ack and timeout happen roughly at the same time (the timeout was triggered before the completion
384
397
// ack canceled the timer). As the completion ack is already processed we don't have to do anything here.
385
398
logging.debug(this , s " forced completion ack for ' $aid' which has no entry " )(tid)
399
+ MetricEmitter .emitCounterMetric(LOADBALANCER_COMPLETION_ACK_FORCED_AFTER_REGULAR )
386
400
}
387
401
}
388
402
@@ -600,6 +614,28 @@ class FPCPoolBalancer(config: WhiskConfig,
600
614
}
601
615
}
602
616
617
+ def emitMetrics () = {
618
+ invokerHealth().map(invokers => {
619
+ MetricEmitter .emitGaugeMetric(HEALTHY_INVOKERS , invokers.count(_.status == Healthy ))
620
+ MetricEmitter .emitGaugeMetric(UNHEALTHY_INVOKERS , invokers.count(_.status == Unhealthy ))
621
+ MetricEmitter .emitGaugeMetric(OFFLINE_INVOKERS , invokers.count(_.status == Offline ))
622
+ // Add both user memory and busy memory because user memory represents free memory in this case
623
+ MetricEmitter .emitGaugeMetric(
624
+ INVOKER_TOTALMEM ,
625
+ invokers.foldLeft(0L ) { (total, curr) =>
626
+ if (curr.status.isUsable) {
627
+ curr.id.userMemory.toMB + curr.id.busyMemory.getOrElse(ByteSize (0 , SizeUnits .BYTE )).toMB + total
628
+ } else {
629
+ total
630
+ }
631
+ })
632
+ MetricEmitter .emitGaugeMetric(LOADBALANCER_ACTIVATIONS_INFLIGHT (controllerInstance), totalActivations.longValue)
633
+ MetricEmitter .emitGaugeMetric(LOADBALANCER_MEMORY_INFLIGHT (controllerInstance, " " ), totalActivationMemory.longValue)
634
+ })
635
+ }
636
+
637
+ actorSystem.scheduler.scheduleAtFixedRate(10 .seconds, 10 .seconds)(() => emitMetrics())
638
+
603
639
/** Gets the number of in-flight activations for a specific user. */
604
640
override def activeActivationsFor (namespace : UUID ): Future [Int ] =
605
641
Future .successful(activationsPerNamespace.get(namespace).map(_.intValue()).getOrElse(0 ))
0 commit comments