Skip to content

Commit 919acfc

Browse files
committed
Implement FPCInvokerReactive
1 parent 184a926 commit 919acfc

File tree

15 files changed

+565
-4
lines changed

15 files changed

+565
-4
lines changed

ansible/group_vars/all

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -465,5 +465,8 @@ etcd_connect_string: "{% set ret = [] %}\
465465

466466
scheduler:
467467
protocol: "{{ scheduler_protocol | default('http') }}"
468+
grpc:
469+
tls: "{{ scheduler_grpc_tls | default(false) }}"
470+
maxPeek: "{{ scheduler_max_peek | default(128) }}"
468471
dataManagementService:
469472
retryInterval: "{{ scheduler_dataManagementService_retryInterval | default('1 second') }}"

common/scala/src/main/scala/org/apache/openwhisk/common/Logging.scala

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -494,6 +494,8 @@ object LoggingMarkers {
494494
"initiator" -> invocationNamespace,
495495
"namespace" -> namespace,
496496
"action" -> action))(MeasurementUnit.none)
497+
def INVOKER_CONTAINER_CREATE(action: String, state: String) =
498+
LogMarkerToken(invoker, "creation", counter, None, Map("action" -> action, "state" -> state))(MeasurementUnit.none)
497499
val INVOKER_CONTAINER_HEALTH = LogMarkerToken(invoker, "containerHealth", start)(MeasurementUnit.time.milliseconds)
498500
val INVOKER_CONTAINER_HEALTH_FAILED_WARM =
499501
LogMarkerToken(invoker, "containerHealthFailed", counter, Some("warm"), Map("containerState" -> "warm"))(

common/scala/src/main/scala/org/apache/openwhisk/core/WhiskConfig.scala

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -243,6 +243,7 @@ object ConfigKeys {
243243
val runtimesRegistry = s"$containerFactory.runtimes-registry"
244244
val userImagesRegistry = s"$containerFactory.user-images-registry"
245245
val containerPool = "whisk.container-pool"
246+
val containerCreationMaxPeek = "whisk.invoker.container-creation.max-peek"
246247
val blacklist = "whisk.blacklist"
247248

248249
val kubernetes = "whisk.kubernetes"
@@ -295,6 +296,7 @@ object ConfigKeys {
295296

296297
val azBlob = "whisk.azure-blob"
297298

299+
val schedulerGrpcService = "whisk.scheduler.grpc"
298300
val schedulerMaxPeek = "whisk.scheduler.max-peek"
299301
val schedulerInProgressJobRetentionSecond = "whisk.scheduler.in-progress-job-retention"
300302

Lines changed: 45 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,45 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.openwhisk.core.ack
19+
20+
import org.apache.openwhisk.common.{Logging, TransactionId}
21+
import org.apache.openwhisk.core.connector.{AcknowledegmentMessage, MessageProducer}
22+
import org.apache.openwhisk.core.entity.{ControllerInstanceId, UUID, WhiskActivation}
23+
import spray.json.DefaultJsonProtocol._
24+
25+
import scala.concurrent.{ExecutionContext, Future}
26+
27+
class HealthActionAck(producer: MessageProducer)(implicit logging: Logging, ec: ExecutionContext) extends ActiveAck {
28+
override def apply(tid: TransactionId,
29+
activationResult: WhiskActivation,
30+
blockingInvoke: Boolean,
31+
controllerInstance: ControllerInstanceId,
32+
userId: UUID,
33+
acknowledegment: AcknowledegmentMessage): Future[Any] = {
34+
implicit val transid: TransactionId = tid
35+
36+
logging.debug(this, s"health action is successfully invoked")
37+
if (activationResult.response.isContainerError || activationResult.response.isWhiskError) {
38+
val actionPath =
39+
activationResult.annotations.getAs[String](WhiskActivation.pathAnnotation).getOrElse("unknown_path")
40+
logging.error(this, s"Failed to invoke action $actionPath, error: ${activationResult.response.toString}")
41+
}
42+
43+
Future.successful({})
44+
}
45+
}

common/scala/src/main/scala/org/apache/openwhisk/core/entity/Identity.scala

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -35,12 +35,14 @@ case class UserLimits(invocationsPerMinute: Option[Int] = None,
3535
concurrentInvocations: Option[Int] = None,
3636
firesPerMinute: Option[Int] = None,
3737
allowedKinds: Option[Set[String]] = None,
38-
storeActivations: Option[Boolean] = None)
38+
storeActivations: Option[Boolean] = None,
39+
warmedContainerKeepingCount: Option[Int] = None,
40+
warmedContainerKeepingTimeout: Option[String] = None)
3941

4042
object UserLimits extends DefaultJsonProtocol {
4143
val standardUserLimits = UserLimits()
4244

43-
implicit val serdes = jsonFormat5(UserLimits.apply)
45+
implicit val serdes = jsonFormat7(UserLimits.apply)
4446
}
4547

4648
protected[core] case class Namespace(name: EntityName, uuid: UUID)

core/invoker/src/main/resources/application.conf

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -155,6 +155,7 @@ whisk {
155155
#aka 'How long should a container sit idle until we kill it?'
156156
idle-container = 10 minutes
157157
pause-grace = 50 milliseconds
158+
keeping-duration = 60 minutes
158159
}
159160
action-health-check {
160161
enabled = false # if true, prewarm containers will be pinged periodically and warm containers will be pinged once after resumed

core/invoker/src/main/scala/org/apache/openwhisk/core/containerpool/ContainerProxy.scala

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -961,7 +961,9 @@ class ContainerProxy(factory: (TransactionId,
961961
}
962962
}
963963

964-
final case class ContainerProxyTimeoutConfig(idleContainer: FiniteDuration, pauseGrace: FiniteDuration)
964+
final case class ContainerProxyTimeoutConfig(idleContainer: FiniteDuration,
965+
pauseGrace: FiniteDuration,
966+
keepingDuration: FiniteDuration)
965967
final case class ContainerProxyHealthCheckConfig(enabled: Boolean, checkPeriod: FiniteDuration, maxFails: Int)
966968
final case class ContainerProxyActivationErrorLogConfig(applicationErrors: Boolean,
967969
developerErrors: Boolean,

0 commit comments

Comments
 (0)