Skip to content

[New Scheduler] Implement FPCInvokerReactive #5125

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 3 commits into from
Jan 13, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions ansible/group_vars/all
Original file line number Diff line number Diff line change
Expand Up @@ -465,6 +465,8 @@ etcd_connect_string: "{% set ret = [] %}\

scheduler:
protocol: "{{ scheduler_protocol | default('http') }}"
grpc:
tls: "{{ scheduler_grpc_tls | default(false) }}"
maxPeek: "{{ scheduler_max_peek | default(128) }}"
queueManager:
maxSchedulingTime: "{{ scheduler_maxSchedulingTime | default('20 second') }}"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -510,6 +510,8 @@ object LoggingMarkers {
"initiator" -> invocationNamespace,
"namespace" -> namespace,
"action" -> action))(MeasurementUnit.none)
def INVOKER_CONTAINER_CREATE(action: String, state: String) =
LogMarkerToken(invoker, "creation", counter, None, Map("action" -> action, "state" -> state))(MeasurementUnit.none)
val INVOKER_CONTAINER_HEALTH = LogMarkerToken(invoker, "containerHealth", start)(MeasurementUnit.time.milliseconds)
val INVOKER_CONTAINER_HEALTH_FAILED_WARM =
LogMarkerToken(invoker, "containerHealthFailed", counter, Some("warm"), Map("containerState" -> "warm"))(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -243,6 +243,7 @@ object ConfigKeys {
val runtimesRegistry = s"$containerFactory.runtimes-registry"
val userImagesRegistry = s"$containerFactory.user-images-registry"
val containerPool = "whisk.container-pool"
val containerCreationMaxPeek = "whisk.invoker.container-creation.max-peek"
val blacklist = "whisk.blacklist"

val kubernetes = "whisk.kubernetes"
Expand Down Expand Up @@ -294,6 +295,7 @@ object ConfigKeys {

val azBlob = "whisk.azure-blob"

val schedulerGrpcService = "whisk.scheduler.grpc"
val schedulerMaxPeek = "whisk.scheduler.max-peek"
val schedulerQueue = "whisk.scheduler.queue"
val schedulerQueueManager = "whisk.scheduler.queue-manager"
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.openwhisk.core.ack

import org.apache.openwhisk.common.{Logging, TransactionId}
import org.apache.openwhisk.core.connector.{AcknowledegmentMessage, MessageProducer}
import org.apache.openwhisk.core.entity.{ControllerInstanceId, UUID, WhiskActivation}
import spray.json.DefaultJsonProtocol._

import scala.concurrent.{ExecutionContext, Future}

class HealthActionAck(producer: MessageProducer)(implicit logging: Logging, ec: ExecutionContext) extends ActiveAck {
override def apply(tid: TransactionId,
activationResult: WhiskActivation,
blockingInvoke: Boolean,
controllerInstance: ControllerInstanceId,
userId: UUID,
acknowledegment: AcknowledegmentMessage): Future[Any] = {
implicit val transid: TransactionId = tid

logging.debug(this, s"health action was successfully invoked")
if (activationResult.response.isContainerError || activationResult.response.isWhiskError) {
val actionPath =
activationResult.annotations.getAs[String](WhiskActivation.pathAnnotation).getOrElse("unknown_path")
logging.error(this, s"Failed to invoke action $actionPath, error: ${activationResult.response.toString}")
}

Future.successful({})
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -35,12 +35,14 @@ case class UserLimits(invocationsPerMinute: Option[Int] = None,
concurrentInvocations: Option[Int] = None,
firesPerMinute: Option[Int] = None,
allowedKinds: Option[Set[String]] = None,
storeActivations: Option[Boolean] = None)
storeActivations: Option[Boolean] = None,
warmedContainerKeepingCount: Option[Int] = None,
warmedContainerKeepingTimeout: Option[String] = None)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

maybe have infects for non-scheduler codes?

Copy link
Contributor Author

@ningyougang ningyougang Jun 10, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes,infects below codes

case class UserLimits(invocationsPerMinute: Option[Int] = None,
                      concurrentInvocations: Option[Int] = None,
                      firesPerMinute: Option[Int] = None,
                      allowedKinds: Option[Set[String]] = None,
                      storeActivations: Option[Boolean] = None,
                      warmedContainerKeepingCount: Option[Int] = None,        // this is the new field
                      warmedContainerKeepingTimeout: Option[String] = None)   // this is the new field

If don't want to infect above code, one option is
make the keepingCount and keepingTime value to a fixed value this time rather than read from db, e.g.

  • keepingCount: 1
  • keepingTimeout: 60.minutes

But there has a problem that all frequent invocation actions will exist at least 1 container in 60.seconds both, obviously, this is a waste of resources

if add above 2 fileds, the benefit is, these 2 values can be configured from db, because these 2 value is for namespace, so one namespace's all actions complete for this keepingCount value.

So my opinion is, in spite of infect the non-scheduler code, it is better to add above 2 fields to case class UserLimits,
anyway, the non-scheduler codes can run well in spite of infect the codes.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@bdoyle0182 @style95 ,due to above 2 fields(warmedContainerKeepingCount, warmedContainerKeepingTimeout) are added into case class UserLimits, do you guys have any opinion?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not opposed to infecting non-scheduler code where absolutely necessary. I think it's unreasonable to suggest we can make such a large architectural change without touching existing code at all. So long as we're avoiding a breaking change I see no issue with this

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@ningyougang @style95 also sorry haven't had time to dedicate to the scheduler recently, but will review the remaining existing pr's this week

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@bdoyle0182 ,thanks for you review, just add 2 fields to case class UserLimits, it is not a large architectural change

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yea I just meant the entire new scheduler is a large architectural change so it's unreasonable to say we can't ever touch other components while implementing. We should avoid where possible, but it will be necessary in a couple situations like this one.


object UserLimits extends DefaultJsonProtocol {
val standardUserLimits = UserLimits()

implicit val serdes = jsonFormat5(UserLimits.apply)
implicit val serdes = jsonFormat7(UserLimits.apply)
}

protected[core] case class Namespace(name: EntityName, uuid: UUID)
Expand Down
1 change: 1 addition & 0 deletions core/invoker/src/main/resources/application.conf
Original file line number Diff line number Diff line change
Expand Up @@ -155,6 +155,7 @@ whisk {
#aka 'How long should a container sit idle until we kill it?'
idle-container = 10 minutes
pause-grace = 50 milliseconds
keeping-duration = 60 minutes
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this the same thing as the idle-container config? Or is the warm container removed even if it has received an activation in the last 60 minutes?

Copy link
Contributor Author

@ningyougang ningyougang Aug 30, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For different configuration.

  • idle-container, e.g.

    If a FunctionPullingContainerProxy actor already executed some activations, but didn't execute activation for some time(e.g. 50.milliseconds), FunctionPullingContainerProxy would goto(Paused), then it will create a timer

    case _ -> Paused   => startSingleTimer(IdleTimeoutName, StateTimeout, idleTimeout)
    

    we can see, it would send StateTimeout after idleTimeout, then, StateTimeout would be handled by below codes, it would judge this FunctionPullingContainerProxy whether in warmedContainerKeepingCount

    when(Paused) {
    ...
      case Event(StateTimeout, data: WarmData) =>
        (for {
          count <- getLiveContainerCount(data.invocationNamespace, data.action.fullyQualifiedName(false), data.revision)
          (warmedContainerKeepingCount, warmedContainerKeepingTimeout) <- getWarmedContainerLimit(
            data.invocationNamespace)
        } yield {
          logging.info(
            this,
            s"Live container count: ${count}, warmed container keeping count configuration: ${warmedContainerKeepingCount} in namespace: ${data.invocationNamespace}")
          if (count <= warmedContainerKeepingCount) {
            Keep(warmedContainerKeepingTimeout)
          } else {
            Remove
          }
        }).pipeTo(self)
        stay
    ...
    }
    
  • keeping-duration
    Then, if this FunctionPullingContainerProxy is not in warmedContainerKeepingCount, it would be removed.
    otherwise, this FunctionPullingContainerProxy would keep more keeping-duration time.

}
action-health-check {
enabled = false # if true, prewarm containers will be pinged periodically and warm containers will be pinged once after resumed
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -961,7 +961,9 @@ class ContainerProxy(factory: (TransactionId,
}
}

final case class ContainerProxyTimeoutConfig(idleContainer: FiniteDuration, pauseGrace: FiniteDuration)
final case class ContainerProxyTimeoutConfig(idleContainer: FiniteDuration,
pauseGrace: FiniteDuration,
keepingDuration: FiniteDuration)
final case class ContainerProxyHealthCheckConfig(enabled: Boolean, checkPeriod: FiniteDuration, maxFails: Int)
final case class ContainerProxyActivationErrorLogConfig(applicationErrors: Boolean,
developerErrors: Boolean,
Expand Down
Loading