@@ -3,13 +3,13 @@ package org.apache.openwhisk.core.loadBalancer
3
3
import java .nio .charset .StandardCharsets
4
4
import java .util .concurrent .ThreadLocalRandom
5
5
import java .util .concurrent .atomic .LongAdder
6
-
7
6
import akka .actor .{Actor , ActorRef , ActorRefFactory , ActorSystem , Cancellable , Props }
8
7
import akka .event .Logging .InfoLevel
9
8
import akka .pattern .ask
10
9
import akka .util .Timeout
11
10
import org .apache .kafka .clients .producer .RecordMetadata
12
11
import org .apache .openwhisk .common .InvokerState .{Healthy , Offline , Unhealthy }
12
+ import org .apache .openwhisk .common .LoggingMarkers ._
13
13
import org .apache .openwhisk .common ._
14
14
import org .apache .openwhisk .core .connector ._
15
15
import org .apache .openwhisk .core .controller .Controller
@@ -336,6 +336,16 @@ class FPCPoolBalancer(config: WhiskConfig,
336
336
}
337
337
}
338
338
339
+ // Singletons for counter metrics related to completion acks
340
+ protected val LOADBALANCER_COMPLETION_ACK_REGULAR =
341
+ LoggingMarkers .LOADBALANCER_COMPLETION_ACK (controllerInstance, RegularCompletionAck )
342
+ protected val LOADBALANCER_COMPLETION_ACK_FORCED =
343
+ LoggingMarkers .LOADBALANCER_COMPLETION_ACK (controllerInstance, ForcedCompletionAck )
344
+ protected val LOADBALANCER_COMPLETION_ACK_REGULAR_AFTER_FORCED =
345
+ LoggingMarkers .LOADBALANCER_COMPLETION_ACK (controllerInstance, RegularAfterForcedCompletionAck )
346
+ protected val LOADBALANCER_COMPLETION_ACK_FORCED_AFTER_REGULAR =
347
+ LoggingMarkers .LOADBALANCER_COMPLETION_ACK (controllerInstance, ForcedAfterRegularCompletionAck )
348
+
339
349
/** Process the completion ack and update the state */
340
350
protected [loadBalancer] def processCompletion (aid : ActivationId ,
341
351
tid : TransactionId ,
@@ -360,8 +370,10 @@ class FPCPoolBalancer(config: WhiskConfig,
360
370
// the active ack is received as expected, and processing that message removed the promise
361
371
// from the corresponding map
362
372
logging.info(this , s " received completion ack for ' $aid', system error= $isSystemError" )(tid)
373
+ MetricEmitter .emitCounterMetric(LOADBALANCER_COMPLETION_ACK_REGULAR )
363
374
} else {
364
375
logging.error(this , s " Failed to invoke action ${aid.toString}, error: timeout waiting for the active ack " )
376
+ MetricEmitter .emitCounterMetric(LOADBALANCER_COMPLETION_ACK_FORCED )
365
377
366
378
// the entry has timed out; if the active ack is still around, remove its entry also
367
379
// and complete the promise with a failure if necessary
@@ -379,11 +391,13 @@ class FPCPoolBalancer(config: WhiskConfig,
379
391
// Logging this condition as a warning because the invoker processed the activation and sent a completion
380
392
// message - but not in time.
381
393
logging.warn(this , s " received completion ack for ' $aid' which has no entry, system error= $isSystemError" )(tid)
394
+ MetricEmitter .emitCounterMetric(LOADBALANCER_COMPLETION_ACK_REGULAR_AFTER_FORCED )
382
395
case None =>
383
396
// The entry has already been removed by a completion ack. This part of the code is reached by the timeout and can
384
397
// happen if completion ack and timeout happen roughly at the same time (the timeout was triggered before the completion
385
398
// ack canceled the timer). As the completion ack is already processed we don't have to do anything here.
386
399
logging.debug(this , s " forced completion ack for ' $aid' which has no entry " )(tid)
400
+ MetricEmitter .emitCounterMetric(LOADBALANCER_COMPLETION_ACK_FORCED_AFTER_REGULAR )
387
401
}
388
402
}
389
403
@@ -601,6 +615,28 @@ class FPCPoolBalancer(config: WhiskConfig,
601
615
}
602
616
}
603
617
618
+ def emitMetrics () = {
619
+ invokerHealth().map(invokers => {
620
+ MetricEmitter .emitGaugeMetric(HEALTHY_INVOKERS , invokers.count(_.status == Healthy ))
621
+ MetricEmitter .emitGaugeMetric(UNHEALTHY_INVOKERS , invokers.count(_.status == Unhealthy ))
622
+ MetricEmitter .emitGaugeMetric(OFFLINE_INVOKERS , invokers.count(_.status == Offline ))
623
+ // Add both user memory and busy memory because user memory represents free memory in this case
624
+ MetricEmitter .emitGaugeMetric(
625
+ INVOKER_TOTALMEM ,
626
+ invokers.foldLeft(0L ) { (total, curr) =>
627
+ if (curr.status.isUsable) {
628
+ curr.id.userMemory.toMB + curr.id.busyMemory.getOrElse(ByteSize (0 , SizeUnits .BYTE )).toMB + total
629
+ } else {
630
+ total
631
+ }
632
+ })
633
+ MetricEmitter .emitGaugeMetric(LOADBALANCER_ACTIVATIONS_INFLIGHT (controllerInstance), totalActivations.longValue)
634
+ MetricEmitter .emitGaugeMetric(LOADBALANCER_MEMORY_INFLIGHT (controllerInstance, " " ), totalActivationMemory.longValue)
635
+ })
636
+ }
637
+
638
+ actorSystem.scheduler.scheduleAtFixedRate(10 .seconds, 10 .seconds)(() => emitMetrics())
639
+
604
640
/** Gets the number of in-flight activations for a specific user. */
605
641
override def activeActivationsFor (namespace : UUID ): Future [Int ] =
606
642
Future .successful(activationsPerNamespace.get(namespace).map(_.intValue()).getOrElse(0 ))
0 commit comments