diff --git a/ansible/group_vars/all b/ansible/group_vars/all index bdd1bec0728..0f9b107b8d5 100644 --- a/ansible/group_vars/all +++ b/ansible/group_vars/all @@ -465,6 +465,8 @@ etcd_connect_string: "{% set ret = [] %}\ scheduler: protocol: "{{ scheduler_protocol | default('http') }}" + grpc: + tls: "{{ scheduler_grpc_tls | default(false) }}" maxPeek: "{{ scheduler_max_peek | default(128) }}" queueManager: maxSchedulingTime: "{{ scheduler_maxSchedulingTime | default('20 second') }}" diff --git a/common/scala/src/main/scala/org/apache/openwhisk/common/Logging.scala b/common/scala/src/main/scala/org/apache/openwhisk/common/Logging.scala index e1859deba53..5bbf43a70c8 100644 --- a/common/scala/src/main/scala/org/apache/openwhisk/common/Logging.scala +++ b/common/scala/src/main/scala/org/apache/openwhisk/common/Logging.scala @@ -510,6 +510,8 @@ object LoggingMarkers { "initiator" -> invocationNamespace, "namespace" -> namespace, "action" -> action))(MeasurementUnit.none) + def INVOKER_CONTAINER_CREATE(action: String, state: String) = + LogMarkerToken(invoker, "creation", counter, None, Map("action" -> action, "state" -> state))(MeasurementUnit.none) val INVOKER_CONTAINER_HEALTH = LogMarkerToken(invoker, "containerHealth", start)(MeasurementUnit.time.milliseconds) val INVOKER_CONTAINER_HEALTH_FAILED_WARM = LogMarkerToken(invoker, "containerHealthFailed", counter, Some("warm"), Map("containerState" -> "warm"))( diff --git a/common/scala/src/main/scala/org/apache/openwhisk/core/WhiskConfig.scala b/common/scala/src/main/scala/org/apache/openwhisk/core/WhiskConfig.scala index 484da2e6b78..e50fbb4c473 100644 --- a/common/scala/src/main/scala/org/apache/openwhisk/core/WhiskConfig.scala +++ b/common/scala/src/main/scala/org/apache/openwhisk/core/WhiskConfig.scala @@ -243,6 +243,7 @@ object ConfigKeys { val runtimesRegistry = s"$containerFactory.runtimes-registry" val userImagesRegistry = s"$containerFactory.user-images-registry" val containerPool = "whisk.container-pool" + val containerCreationMaxPeek = "whisk.invoker.container-creation.max-peek" val blacklist = "whisk.blacklist" val kubernetes = "whisk.kubernetes" @@ -294,6 +295,7 @@ object ConfigKeys { val azBlob = "whisk.azure-blob" + val schedulerGrpcService = "whisk.scheduler.grpc" val schedulerMaxPeek = "whisk.scheduler.max-peek" val schedulerQueue = "whisk.scheduler.queue" val schedulerQueueManager = "whisk.scheduler.queue-manager" diff --git a/common/scala/src/main/scala/org/apache/openwhisk/core/ack/HealthActionAck.scala b/common/scala/src/main/scala/org/apache/openwhisk/core/ack/HealthActionAck.scala new file mode 100644 index 00000000000..83c4acd3ba2 --- /dev/null +++ b/common/scala/src/main/scala/org/apache/openwhisk/core/ack/HealthActionAck.scala @@ -0,0 +1,45 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.openwhisk.core.ack + +import org.apache.openwhisk.common.{Logging, TransactionId} +import org.apache.openwhisk.core.connector.{AcknowledegmentMessage, MessageProducer} +import org.apache.openwhisk.core.entity.{ControllerInstanceId, UUID, WhiskActivation} +import spray.json.DefaultJsonProtocol._ + +import scala.concurrent.{ExecutionContext, Future} + +class HealthActionAck(producer: MessageProducer)(implicit logging: Logging, ec: ExecutionContext) extends ActiveAck { + override def apply(tid: TransactionId, + activationResult: WhiskActivation, + blockingInvoke: Boolean, + controllerInstance: ControllerInstanceId, + userId: UUID, + acknowledegment: AcknowledegmentMessage): Future[Any] = { + implicit val transid: TransactionId = tid + + logging.debug(this, s"health action was successfully invoked") + if (activationResult.response.isContainerError || activationResult.response.isWhiskError) { + val actionPath = + activationResult.annotations.getAs[String](WhiskActivation.pathAnnotation).getOrElse("unknown_path") + logging.error(this, s"Failed to invoke action $actionPath, error: ${activationResult.response.toString}") + } + + Future.successful({}) + } +} diff --git a/common/scala/src/main/scala/org/apache/openwhisk/core/entity/Identity.scala b/common/scala/src/main/scala/org/apache/openwhisk/core/entity/Identity.scala index 57b9f31e85f..b05ae7021eb 100644 --- a/common/scala/src/main/scala/org/apache/openwhisk/core/entity/Identity.scala +++ b/common/scala/src/main/scala/org/apache/openwhisk/core/entity/Identity.scala @@ -35,12 +35,14 @@ case class UserLimits(invocationsPerMinute: Option[Int] = None, concurrentInvocations: Option[Int] = None, firesPerMinute: Option[Int] = None, allowedKinds: Option[Set[String]] = None, - storeActivations: Option[Boolean] = None) + storeActivations: Option[Boolean] = None, + warmedContainerKeepingCount: Option[Int] = None, + warmedContainerKeepingTimeout: Option[String] = None) object UserLimits extends DefaultJsonProtocol { val standardUserLimits = UserLimits() - implicit val serdes = jsonFormat5(UserLimits.apply) + implicit val serdes = jsonFormat7(UserLimits.apply) } protected[core] case class Namespace(name: EntityName, uuid: UUID) diff --git a/core/invoker/src/main/resources/application.conf b/core/invoker/src/main/resources/application.conf index 9aeea1338c3..2dc80d48342 100644 --- a/core/invoker/src/main/resources/application.conf +++ b/core/invoker/src/main/resources/application.conf @@ -155,6 +155,7 @@ whisk { #aka 'How long should a container sit idle until we kill it?' idle-container = 10 minutes pause-grace = 50 milliseconds + keeping-duration = 60 minutes } action-health-check { enabled = false # if true, prewarm containers will be pinged periodically and warm containers will be pinged once after resumed diff --git a/core/invoker/src/main/scala/org/apache/openwhisk/core/containerpool/ContainerProxy.scala b/core/invoker/src/main/scala/org/apache/openwhisk/core/containerpool/ContainerProxy.scala index f50f6ca803c..1f8476c296c 100644 --- a/core/invoker/src/main/scala/org/apache/openwhisk/core/containerpool/ContainerProxy.scala +++ b/core/invoker/src/main/scala/org/apache/openwhisk/core/containerpool/ContainerProxy.scala @@ -961,7 +961,9 @@ class ContainerProxy(factory: (TransactionId, } } -final case class ContainerProxyTimeoutConfig(idleContainer: FiniteDuration, pauseGrace: FiniteDuration) +final case class ContainerProxyTimeoutConfig(idleContainer: FiniteDuration, + pauseGrace: FiniteDuration, + keepingDuration: FiniteDuration) final case class ContainerProxyHealthCheckConfig(enabled: Boolean, checkPeriod: FiniteDuration, maxFails: Int) final case class ContainerProxyActivationErrorLogConfig(applicationErrors: Boolean, developerErrors: Boolean, diff --git a/core/invoker/src/main/scala/org/apache/openwhisk/core/invoker/FPCInvokerReactive.scala b/core/invoker/src/main/scala/org/apache/openwhisk/core/invoker/FPCInvokerReactive.scala new file mode 100644 index 00000000000..8a0fdc4720f --- /dev/null +++ b/core/invoker/src/main/scala/org/apache/openwhisk/core/invoker/FPCInvokerReactive.scala @@ -0,0 +1,481 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.openwhisk.core.invoker + +import akka.Done +import akka.actor.{ActorRef, ActorRefFactory, ActorSystem, CoordinatedShutdown, Props} +import akka.grpc.GrpcClientSettings +import akka.http.scaladsl.server.Directives._ +import akka.http.scaladsl.server.Route +import com.ibm.etcd.api.Event.EventType +import com.ibm.etcd.client.kv.KvClient.Watch +import com.ibm.etcd.client.kv.WatchUpdate +import org.apache.kafka.clients.producer.RecordMetadata +import org.apache.openwhisk.common._ +import org.apache.openwhisk.core.ack.{ActiveAck, HealthActionAck, MessagingActiveAck, UserEventSender} +import org.apache.openwhisk.core.connector._ +import org.apache.openwhisk.core.containerpool._ +import org.apache.openwhisk.core.containerpool.logging.LogStoreProvider +import org.apache.openwhisk.core.containerpool.v2._ +import org.apache.openwhisk.core.database.{UserContext, _} +import org.apache.openwhisk.core.entity._ +import org.apache.openwhisk.core.etcd.EtcdKV.ContainerKeys.{containerPrefix} +import org.apache.openwhisk.core.etcd.EtcdKV.QueueKeys.queue +import org.apache.openwhisk.core.etcd.EtcdKV.{ContainerKeys, SchedulerKeys} +import org.apache.openwhisk.core.etcd.EtcdType._ +import org.apache.openwhisk.core.etcd.{EtcdClient, EtcdConfig} +import org.apache.openwhisk.core.scheduler.{SchedulerEndpoints, SchedulerStates} +import org.apache.openwhisk.core.service.{DataManagementService, EtcdWorker, LeaseKeepAliveService, WatcherService} +import org.apache.openwhisk.core.{ConfigKeys, WarmUp, WhiskConfig} +import org.apache.openwhisk.grpc.{ActivationServiceClient, FetchRequest} +import org.apache.openwhisk.spi.SpiLoader +import pureconfig._ +import pureconfig.generic.auto._ + +import scala.collection.JavaConverters._ +import scala.collection.concurrent.TrieMap +import scala.concurrent.duration._ +import scala.concurrent.{ExecutionContext, ExecutionContextExecutor, Future} +import scala.util.{Failure, Success, Try} + +case class GrpcServiceConfig(tls: Boolean) + +object FPCInvokerReactive extends InvokerProvider { + + override def instance( + config: WhiskConfig, + instance: InvokerInstanceId, + producer: MessageProducer, + poolConfig: ContainerPoolConfig, + limitsConfig: ConcurrencyLimitConfig)(implicit actorSystem: ActorSystem, logging: Logging): InvokerCore = + new FPCInvokerReactive(config, instance, producer, poolConfig, limitsConfig) +} + +class FPCInvokerReactive(config: WhiskConfig, + instance: InvokerInstanceId, + producer: MessageProducer, + poolConfig: ContainerPoolConfig = + loadConfigOrThrow[ContainerPoolConfig](ConfigKeys.containerPool), + limitsConfig: ConcurrencyLimitConfig = loadConfigOrThrow[ConcurrencyLimitConfig]( + ConfigKeys.concurrencyLimit))(implicit actorSystem: ActorSystem, logging: Logging) + extends InvokerCore { + + implicit val ec: ExecutionContext = actorSystem.dispatcher + implicit val exe: ExecutionContextExecutor = actorSystem.dispatcher + implicit val cfg: WhiskConfig = config + + private val logsProvider = SpiLoader.get[LogStoreProvider].instance(actorSystem) + logging.info(this, s"LogStoreProvider: ${logsProvider.getClass}") + + private val etcdClient = EtcdClient(loadConfigOrThrow[EtcdConfig](ConfigKeys.etcd).hosts) + + private val grpcConfig = loadConfigOrThrow[GrpcServiceConfig](ConfigKeys.schedulerGrpcService) + + val watcherService: ActorRef = actorSystem.actorOf(WatcherService.props(etcdClient)) + + private val leaseService = + actorSystem.actorOf(LeaseKeepAliveService.props(etcdClient, instance, watcherService)) + + private val etcdWorkerFactory = + (f: ActorRefFactory) => f.actorOf(EtcdWorker.props(etcdClient, leaseService)) + + val dataManagementService: ActorRef = + actorSystem.actorOf(DataManagementService.props(watcherService, etcdWorkerFactory)) + + private val warmedSchedulers = TrieMap[String, String]() + private var warmUpWatcher: Option[Watch] = None + + /** + * Factory used by the ContainerProxy to physically create a new container. + * + * Create and initialize the container factory before kicking off any other + * task or actor because further operation does not make sense if something + * goes wrong here. Initialization will throw an exception upon failure. + */ + private val containerFactory = + SpiLoader + .get[ContainerFactoryProvider] + .instance( + actorSystem, + logging, + config, + instance, + Map( + "--cap-drop" -> Set("NET_RAW", "NET_ADMIN"), + "--ulimit" -> Set("nofile=1024:1024"), + "--pids-limit" -> Set("1024")) ++ logsProvider.containerParameters) + containerFactory.init() + + CoordinatedShutdown(actorSystem) + .addTask(CoordinatedShutdown.PhaseBeforeActorSystemTerminate, "cleanup runtime containers") { () => + containerFactory.cleanup() + Future.successful(Done) + } + + /** Initialize needed databases */ + private val entityStore = WhiskEntityStore.datastore() + private val activationStore = + SpiLoader.get[ActivationStoreProvider].instance(actorSystem, logging) + + private val authStore = WhiskAuthStore.datastore() + + private val namespaceBlacklist: NamespaceBlacklist = new NamespaceBlacklist(authStore) + + Scheduler.scheduleWaitAtMost(loadConfigOrThrow[NamespaceBlacklistConfig](ConfigKeys.blacklist).pollInterval) { () => + logging.debug(this, "running background job to update blacklist") + namespaceBlacklist.refreshBlacklist()(ec, TransactionId.invoker).andThen { + case Success(set) => logging.info(this, s"updated blacklist to ${set.size} entries") + case Failure(t) => logging.error(this, s"error on updating the blacklist: ${t.getMessage}") + } + } + + val containerProxyTimeoutConfig = loadConfigOrThrow[ContainerProxyTimeoutConfig](ConfigKeys.containerProxyTimeouts) + + private def getWarmedContainerLimit(invocationNamespace: String): Future[(Int, FiniteDuration)] = { + implicit val trasnid = TransactionId.unknown + Identity + .get(authStore, EntityName(invocationNamespace))(trasnid) + .map { identity => + val warmedContainerKeepingCount = identity.limits.warmedContainerKeepingCount.getOrElse(1) + val warmedContainerKeepingTimeout = Try { + identity.limits.warmedContainerKeepingTimeout.map(Duration(_).toSeconds.seconds).get + }.getOrElse(containerProxyTimeoutConfig.keepingDuration) + (warmedContainerKeepingCount, warmedContainerKeepingTimeout) + } + .andThen { + case Failure(_: NoDocumentException) => + logging.warn(this, s"namespace does not exist: $invocationNamespace")(trasnid) + case Failure(_: IllegalStateException) => + logging.warn(this, s"namespace is not unique: $invocationNamespace")(trasnid) + } + } + + private val ack = { + val sender = if (UserEvents.enabled) Some(new UserEventSender(producer)) else None + new MessagingActiveAck(producer, instance, sender) + } + + // we don't need to store health action results in normal case + private val healthActionAck: ActiveAck = new HealthActionAck(producer) + + private val collectLogs = new LogStoreCollector(logsProvider) + + /** Stores an activation in the database. */ + private val store = (tid: TransactionId, activation: WhiskActivation, isBlocking: Boolean, context: UserContext) => { + implicit val transid: TransactionId = tid + activationStore.storeAfterCheck(activation, isBlocking, None, context)(tid, notifier = None, logging) + } + + private def healthActivationClientFactory(f: ActorRefFactory, + invocationNamespace: String, + fqn: FullyQualifiedEntityName, + rev: DocRevision, + schedulerHost: String, + rpcPort: Int, + containerId: ContainerId): ActorRef = + f.actorOf(Props(HealthActivationServiceClient())) + + private def healthContainerProxyFactory(f: ActorRefFactory, healthManger: ActorRef): ActorRef = { + implicit val transId = TransactionId.invokerNanny + f.actorOf( + FunctionPullingContainerProxy + .props( + containerFactory.createContainer, + entityStore, + namespaceBlacklist, + WhiskAction.get, + dataManagementService, + healthActivationClientFactory, + healthActionAck, + store, + collectLogs, + getLiveContainerCount, + getWarmedContainerLimit, + instance, + healthManger, + poolConfig, + containerProxyTimeoutConfig)) + } + + private val invokerHealthManager = + actorSystem.actorOf( + InvokerHealthManager.props(instance, healthContainerProxyFactory, dataManagementService, entityStore)) + + invokerHealthManager ! Enable + + private def activationClientFactory(etcd: EtcdClient)( + invocationNamespace: String, + fqn: FullyQualifiedEntityName, + schedulerHost: String, + rpcPort: Int, + tryOtherScheduler: Boolean = false): Future[ActivationServiceClient] = { + + if (!tryOtherScheduler) { + val setting = + GrpcClientSettings + .connectToServiceAt(schedulerHost, rpcPort) + .withTls(grpcConfig.tls) + Future { + ActivationServiceClient(setting) + }.andThen { + case Failure(t) => + logging.error( + this, + s"unable to create activation client for action ${fqn}: ${t} on original scheduler: ${schedulerHost}:${rpcPort}") + } + } else { + val leaderKey = queue(invocationNamespace, fqn, leader = true) + etcd + .get(leaderKey) + .flatMap { res => + require(!res.getKvsList.isEmpty) + + val endpoint: String = res.getKvsList.get(0).getValue + Future(SchedulerEndpoints.parse(endpoint)) + .flatMap(Future.fromTry) + .map { schedulerEndpoint => + val setting = + GrpcClientSettings + .connectToServiceAt(schedulerEndpoint.host, schedulerEndpoint.rpcPort) + .withTls(grpcConfig.tls) + + ActivationServiceClient(setting) + } + .andThen { + case Failure(t) => + logging.error(this, s"unable to create activation client for action ${fqn}: ${t}") + } + } + } + + } + + private def sendAckToScheduler(schedulerInstanceId: SchedulerInstanceId, + creationAckMessage: ContainerCreationAckMessage): Future[RecordMetadata] = { + val topic = s"${Invoker.topicPrefix}creationAck${schedulerInstanceId.asString}" + val reschedulable = + creationAckMessage.error.map(ContainerCreationError.whiskErrors.contains(_)).getOrElse(false) + if (reschedulable) { + MetricEmitter.emitCounterMetric( + LoggingMarkers.INVOKER_CONTAINER_CREATE(creationAckMessage.action.toString, "reschedule")) + } else if (creationAckMessage.error.nonEmpty) { + MetricEmitter.emitCounterMetric( + LoggingMarkers.INVOKER_CONTAINER_CREATE(creationAckMessage.action.toString, "failed")) + } + + producer.send(topic, creationAckMessage).andThen { + case Success(_) => + logging.info( + this, + s"Posted ${if (reschedulable) "rescheduling" + else if (creationAckMessage.error.nonEmpty) "failed" + else "success"} ack of container creation ${creationAckMessage.creationId} for ${creationAckMessage.invocationNamespace}/${creationAckMessage.action}") + case Failure(t) => + logging.error( + this, + s"failed to send container creation ack message(${creationAckMessage.creationId}) for ${creationAckMessage.invocationNamespace}/${creationAckMessage.action} to scheduler: ${t.getMessage}") + } + } + + /** Creates a ContainerProxy Actor when being called. */ + private val childFactory = (f: ActorRefFactory) => { + implicit val transId = TransactionId.invokerNanny + f.actorOf( + FunctionPullingContainerProxy + .props( + containerFactory.createContainer, + entityStore, + namespaceBlacklist, + WhiskAction.get, + dataManagementService, + clientProxyFactory, + ack, + store, + collectLogs, + getLiveContainerCount, + getWarmedContainerLimit, + instance, + invokerHealthManager, + poolConfig, + containerProxyTimeoutConfig)) + } + + /** Creates a ActivationClientProxy Actor when being called. */ + private def clientProxyFactory(f: ActorRefFactory, + invocationNamespace: String, + fqn: FullyQualifiedEntityName, + rev: DocRevision, + schedulerHost: String, + rpcPort: Int, + containerId: ContainerId): ActorRef = { + implicit val transId = TransactionId.invokerNanny + f.actorOf( + ActivationClientProxy + .props(invocationNamespace, fqn, rev, schedulerHost, rpcPort, containerId, activationClientFactory(etcdClient))) + } + + val prewarmingConfigs: List[PrewarmingConfig] = { + ExecManifest.runtimesManifest.stemcells.flatMap { + case (mf, cells) => + cells.map { cell => + PrewarmingConfig(cell.initialCount, new CodeExecAsString(mf, "", None), cell.memory) + } + }.toList + } + + private val pool = + actorSystem.actorOf( + ContainerPoolV2 + .props(childFactory, invokerHealthManager, poolConfig, instance, prewarmingConfigs, sendAckToScheduler)) + + private def getLiveContainerCount(invocationNamespace: String, + fqn: FullyQualifiedEntityName, + revision: DocRevision): Future[Long] = { + val namespacePrefix = containerPrefix(ContainerKeys.namespacePrefix, invocationNamespace, fqn, Some(revision)) + val inProgressPrefix = containerPrefix(ContainerKeys.inProgressPrefix, invocationNamespace, fqn, Some(revision)) + val warmedPrefix = containerPrefix(ContainerKeys.warmedPrefix, invocationNamespace, fqn, Some(revision)) + for { + namespaceCount <- etcdClient.getCount(namespacePrefix) + inProgressCount <- etcdClient.getCount(inProgressPrefix) + warmedCount <- etcdClient.getCount(warmedPrefix) + } yield { + namespaceCount + inProgressCount + warmedCount + } + } + + /** Initialize message consumers */ + private val msgProvider = SpiLoader.get[MessagingProvider] + //number of peeked messages - increasing the concurrentPeekFactor improves concurrent usage, but adds risk for message loss in case of crash + private val maxPeek = loadConfigOrThrow[Int](ConfigKeys.containerCreationMaxPeek) + private var consumer: Option[ContainerMessageConsumer] = Some( + new ContainerMessageConsumer( + instance, + pool, + entityStore, + cfg, + msgProvider, + longPollDuration = 1.second, + maxPeek, + sendAckToScheduler)) + + override def enable(): Route = { + invokerHealthManager ! Enable + pool ! Enable + // re-enable consumer + if (consumer.isEmpty) + consumer = Some( + new ContainerMessageConsumer( + instance, + pool, + entityStore, + cfg, + msgProvider, + longPollDuration = 1.second, + maxPeek, + sendAckToScheduler)) + warmUp() + complete("Success enable invoker") + } + + override def disable(): Route = { + invokerHealthManager ! GracefulShutdown + pool ! GracefulShutdown + consumer.foreach(_.close()) + consumer = None + warmUpWatcher.foreach(_.close()) + warmUpWatcher = None + complete("Successfully disabled invoker") + } + + override def backfillPrewarm(): Route = { + pool ! AdjustPrewarmedContainer + complete("backfilling prewarm container") + } + + private val warmUpFetchRequest = FetchRequest( + TransactionId(TransactionId.generateTid()).serialize, + InvokerHealthManager.healthActionIdentity.namespace.name.asString, + WarmUp.warmUpAction.serialize, + DocRevision.empty.serialize) // a warm up fetch request which contains nothing + + // warm up grpc connection with scheduler + private def warmUpScheduler(scheduler: SchedulerEndpoints) = { + val setting = + GrpcClientSettings + .connectToServiceAt(scheduler.host, scheduler.rpcPort) + .withTls(grpcConfig.tls) + val client = ActivationServiceClient(setting) + client.fetchActivation(warmUpFetchRequest).andThen { + case _ => + logging.info(this, s"Warmed up scheduler $scheduler") + client.close() + } + } + + private def warmUp(): Unit = { + implicit val transId = TransactionId.warmUp + if (warmUpWatcher.isEmpty) + warmUpWatcher = Some(etcdClient.watch(SchedulerKeys.prefix, true) { res: WatchUpdate => + res.getEvents.asScala.foreach { + event => + event.getType match { + case EventType.DELETE => + val key = event.getPrevKv.getKey + warmedSchedulers.remove(key) + case EventType.PUT => + val key = event.getKv.getKey + val value = event.getKv.getValue + SchedulerStates + .parse(value) + .map { state => + // warm up new scheduler + warmedSchedulers.getOrElseUpdate(key, { + logging.info(this, s"Warm up scheduler ${state.sid}") + warmUpScheduler(state.endpoints) + value + }) + } + case _ => + } + } + }) + + etcdClient.getPrefix(SchedulerKeys.prefix).map { res => + res.getKvsList.asScala.map { kv => + val scheduler = kv.getKey + warmedSchedulers.getOrElseUpdate( + scheduler, { + logging.info(this, s"Warm up scheduler $scheduler") + SchedulerStates + .parse(kv.getValue) + .map { state => + warmUpScheduler(state.endpoints) + } + .recover { + case t => + logging.error(this, s"Unexpected error $t") + } + + kv.getValue + }) + + } + } + } + warmUp() +} diff --git a/core/invoker/src/main/scala/org/apache/openwhisk/core/invoker/Invoker.scala b/core/invoker/src/main/scala/org/apache/openwhisk/core/invoker/Invoker.scala index 437beb314fa..6de751f2ec9 100644 --- a/core/invoker/src/main/scala/org/apache/openwhisk/core/invoker/Invoker.scala +++ b/core/invoker/src/main/scala/org/apache/openwhisk/core/invoker/Invoker.scala @@ -220,6 +220,7 @@ trait InvokerProvider extends Spi { trait InvokerCore { def enable(): Route def disable(): Route + def backfillPrewarm(): Route } /** diff --git a/core/invoker/src/main/scala/org/apache/openwhisk/core/invoker/InvokerReactive.scala b/core/invoker/src/main/scala/org/apache/openwhisk/core/invoker/InvokerReactive.scala index 3177cb76d8a..f2d36b144aa 100644 --- a/core/invoker/src/main/scala/org/apache/openwhisk/core/invoker/InvokerReactive.scala +++ b/core/invoker/src/main/scala/org/apache/openwhisk/core/invoker/InvokerReactive.scala @@ -307,4 +307,8 @@ class InvokerReactive( complete("not supported") } + override def backfillPrewarm(): Route = { + complete("not supported") + } + } diff --git a/tests/src/test/resources/application.conf.j2 b/tests/src/test/resources/application.conf.j2 index 6b69172bc32..352fa1677b8 100644 --- a/tests/src/test/resources/application.conf.j2 +++ b/tests/src/test/resources/application.conf.j2 @@ -140,13 +140,15 @@ whisk { scheduler { protocol = "{{ scheduler.protocol }}" + grpc { + tls = "{{ scheduler.grpc.tls | default('false') | lower }}" + } queue-manager { max-scheduling-time = "{{ scheduler.queueManager.maxSchedulingTime }}" max-retries-to-get-queue = "{{ scheduler.queueManager.maxRetriesToGetQueue }}" } max-peek = "{{ scheduler.maxPeek }}" } - } #test-only overrides so that tests can override defaults in application.conf (todo: move all defaults to reference.conf) diff --git a/tests/src/test/scala/org/apache/openwhisk/core/containerpool/v2/test/FunctionPullingContainerProxyTests.scala b/tests/src/test/scala/org/apache/openwhisk/core/containerpool/v2/test/FunctionPullingContainerProxyTests.scala index 9b91d55e5cb..c77932007a5 100644 --- a/tests/src/test/scala/org/apache/openwhisk/core/containerpool/v2/test/FunctionPullingContainerProxyTests.scala +++ b/tests/src/test/scala/org/apache/openwhisk/core/containerpool/v2/test/FunctionPullingContainerProxyTests.scala @@ -117,7 +117,7 @@ class FunctionPullingContainerProxyTests false, 1.second) - val timeoutConfig = ContainerProxyTimeoutConfig(5.seconds, 5.seconds) + val timeoutConfig = ContainerProxyTimeoutConfig(5.seconds, 5.seconds, 5.seconds) val messageTransId = TransactionId(TransactionId.testing.meta.id) diff --git a/tests/src/test/scala/org/apache/openwhisk/core/invoker/test/DefaultInvokerServerTests.scala b/tests/src/test/scala/org/apache/openwhisk/core/invoker/test/DefaultInvokerServerTests.scala index 153e228b7d9..109512892f4 100644 --- a/tests/src/test/scala/org/apache/openwhisk/core/invoker/test/DefaultInvokerServerTests.scala +++ b/tests/src/test/scala/org/apache/openwhisk/core/invoker/test/DefaultInvokerServerTests.scala @@ -130,6 +130,10 @@ class TestInvokerReactive extends InvokerCore with BasicHttpService { complete("") } + override def backfillPrewarm(): Route = { + complete("") + } + def reset(): Unit = { enableCount = 0 disableCount = 0 diff --git a/tests/src/test/scala/org/apache/openwhisk/core/invoker/test/FPCInvokerServerTests.scala b/tests/src/test/scala/org/apache/openwhisk/core/invoker/test/FPCInvokerServerTests.scala index 8cf8d911bc0..e387cd637c2 100644 --- a/tests/src/test/scala/org/apache/openwhisk/core/invoker/test/FPCInvokerServerTests.scala +++ b/tests/src/test/scala/org/apache/openwhisk/core/invoker/test/FPCInvokerServerTests.scala @@ -129,6 +129,10 @@ class TestFPCInvokerReactive extends InvokerCore with BasicHttpService { complete("") } + override def backfillPrewarm(): Route = { + complete("") + } + def reset(): Unit = { enableCount = 0 disableCount = 0