Skip to content

Introduce scheduling configurations. #5232

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 9 commits into from
May 31, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions ansible/group_vars/all
Original file line number Diff line number Diff line change
Expand Up @@ -513,6 +513,10 @@ scheduler:
inProgressJobRetention: "{{ scheduler_inProgressJobRetention | default('20 seconds') }}"
managedFraction: "{{ scheduler_managed_fraction | default(1.0 - (scheduler_blackbox_fraction | default(__scheduler_blackbox_fraction))) }}"
blackboxFraction: "{{ scheduler_blackbox_fraction | default(__scheduler_blackbox_fraction) }}"
scheduling:
staleThreshold: "{{ scheduler_scheduling_staleThreshold | default('100 milliseconds') }}"
checkInterval: "{{ scheduler_scheduling_checkInterval | default('100 milliseconds') }}"
dropInterval: "{{ scheduler_scheduling_dropInterval | default('10 seconds') }}"
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How about dropStaleActivationTime for this config? Or is the config here the interval at which to check whether there are stale activations to drop

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is just an interval to check to see if there are any stale activations.

queueManager:
maxSchedulingTime: "{{ scheduler_maxSchedulingTime | default('20 second') }}"
maxRetriesToGetQueue: "{{ scheduler_maxRetriesToGetQueue | default(13) }}"
Expand Down
3 changes: 3 additions & 0 deletions ansible/roles/schedulers/tasks/deploy.yml
Original file line number Diff line number Diff line change
Expand Up @@ -113,6 +113,9 @@
"CONFIG_whisk_scheduler_maxPeek": "{{ scheduler.maxPeek }}"
"CONFIG_whisk_scheduler_dataManagementService_retryInterval": "{{ scheduler.dataManagementService.retryInterval }}"
"CONFIG_whisk_scheduler_inProgressJobRetention": "{{ scheduler.inProgressJobRetention }}"
"CONFIG_whisk_scheduler_scheduling_staleThreshold": "{{ scheduler.scheduling.staleThreshold }}"
"CONFIG_whisk_scheduler_scheduling_checkInterval": "{{ scheduler.scheduling.checkInterval }}"
"CONFIG_whisk_scheduler_scheduling_dropInterval": "{{ scheduler.scheduling.dropInterval }}"
"CONFIG_whisk_scheduler_queueManager_maxSchedulingTime": "{{ scheduler.queueManager.maxSchedulingTime }}"
"CONFIG_whisk_scheduler_queueManager_maxRetriesToGetQueue": "{{ scheduler.queueManager.maxRetriesToGetQueue }}"
"CONFIG_whisk_scheduler_queue_idleGrace": "{{ scheduler.queue.idleGrace }}"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -301,9 +301,11 @@ object ConfigKeys {

val schedulerGrpcService = "whisk.scheduler.grpc"
val schedulerMaxPeek = "whisk.scheduler.max-peek"
val schedulerScheduling = "whisk.scheduler.scheduling"
val schedulerQueue = "whisk.scheduler.queue"
val schedulerQueueManager = "whisk.scheduler.queue-manager"
val schedulerInProgressJobRetention = "whisk.scheduler.in-progress-job-retention"
val schedulerStaleThreshold = "whisk.scheduler.stale-threshold"

val whiskClusterName = "whisk.cluster.name"

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,12 @@ package org.apache.openwhisk.core.database.memory
import java.time.Instant
import akka.actor.ActorSystem
import org.apache.openwhisk.common.{Logging, PrintStreamLogging, TransactionId, WhiskInstants}
import org.apache.openwhisk.core.database.{ActivationStore, ActivationStoreProvider, CacheChangeNotification, UserContext}
import org.apache.openwhisk.core.database.{
ActivationStore,
ActivationStoreProvider,
CacheChangeNotification,
UserContext
}
import org.apache.openwhisk.core.entity.{ActivationId, DocInfo, EntityName, EntityPath, Subject, WhiskActivation}
import spray.json.{JsNumber, JsObject}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -639,17 +639,16 @@ class FPCPoolBalancer(config: WhiskConfig,
MetricEmitter.emitGaugeMetric(UNHEALTHY_INVOKERS, invokers.count(_.status == Unhealthy))
MetricEmitter.emitGaugeMetric(OFFLINE_INVOKERS, invokers.count(_.status == Offline))
// Add both user memory and busy memory because user memory represents free memory in this case
MetricEmitter.emitGaugeMetric(
INVOKER_TOTALMEM,
invokers.foldLeft(0L) { (total, curr) =>
if (curr.status.isUsable) {
curr.id.userMemory.toMB + curr.id.busyMemory.getOrElse(ByteSize(0, SizeUnits.BYTE)).toMB + total
} else {
total
}
})
MetricEmitter.emitGaugeMetric(INVOKER_TOTALMEM, invokers.foldLeft(0L) { (total, curr) =>
if (curr.status.isUsable) {
curr.id.userMemory.toMB + curr.id.busyMemory.getOrElse(ByteSize(0, SizeUnits.BYTE)).toMB + total
} else {
total
}
})
MetricEmitter.emitGaugeMetric(LOADBALANCER_ACTIVATIONS_INFLIGHT(controllerInstance), totalActivations.longValue)
MetricEmitter.emitGaugeMetric(LOADBALANCER_MEMORY_INFLIGHT(controllerInstance, ""), totalActivationMemory.longValue)
MetricEmitter
.emitGaugeMetric(LOADBALANCER_MEMORY_INFLIGHT(controllerInstance, ""), totalActivationMemory.longValue)
})
}

Expand Down
5 changes: 5 additions & 0 deletions core/scheduler/src/main/resources/application.conf
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,11 @@ whisk {
grpc {
tls = "false"
}
scheduling {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

since this is already under scheduler I think this isn't providing any more descriptiveness. How about calling the config decision-maker?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

These configurations are related to both MemoryQueue and DecisionMaker.
The two intervals are not used by the decision-maker.

stale-threshold = "100 milliseconds"
Copy link
Contributor

@ningyougang ningyougang May 11, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Because the value is FiniteDuration type, how about change it to stale-duration-threshold?
(btw, check-interval or drop-interval, from the name, we can know it is a FiniteDuration type easily)

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I just thought its value includes the unit so we can easily figure out its type.

check-interval = "100 milliseconds"
drop-interval = "10 seconds"
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is another config I forgot to mention we'll need to tune so good to see it configurable here. Does 10 seconds make sense as the default if the built in max action duration is 60 seconds? If the namespace or action is throttled and can't create more containers, then may be waiting up to 60 seconds to process new activations. On the flip side I think this is used to be able to stop the scheduler once all queues are drained before stopping so I also understand why you would want this to be on the lower side

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The drop interval is to periodically check whether there are any stale activations.
https://github.com/apache/openwhisk/blob/master/core/scheduler/src/main/scala/org/apache/openwhisk/core/scheduler/queue/MemoryQueue.scala#L865

The retention time is controlled by the queue retention time which is 60s by default.
https://github.com/apache/openwhisk/blob/master/core/scheduler/src/main/resources/application.conf#L73

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see okay. Maybe the max retention ms default should be the same as the default completion ack timeout which I think is three minutes based on the timeout calculation in code

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ok, so do you want me to update the default retention time to 3 mins?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yea I think that would be best unless you see a reason why it should be 1 min since you're more familiar with the architecture

}
queue {
idle-grace = "20 seconds"
stop-grace = "20 seconds"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,8 @@ class Scheduler(schedulerId: SchedulerInstanceId, schedulerEndpoints: SchedulerE
val leaseService =
actorSystem.actorOf(LeaseKeepAliveService.props(etcdClient, schedulerId, watcherService))

val schedulingConfig = loadConfigOrThrow[SchedulingConfig](ConfigKeys.schedulerQueue)

implicit val entityStore = WhiskEntityStore.datastore()
private val activationStore =
SpiLoader.get[ActivationStoreProvider].instance(actorSystem, logging)
Expand Down Expand Up @@ -191,15 +193,15 @@ class Scheduler(schedulerId: SchedulerInstanceId, schedulerEndpoints: SchedulerE
: (ActorRefFactory, String, FullyQualifiedEntityName, DocRevision, WhiskActionMetaData) => ActorRef =
(factory, invocationNamespace, fqn, revision, actionMetaData) => {
// Todo: Change this to SPI
val decisionMaker = factory.actorOf(SchedulingDecisionMaker.props(invocationNamespace, fqn))
val decisionMaker = factory.actorOf(SchedulingDecisionMaker.props(invocationNamespace, fqn, schedulingConfig))

factory.actorOf(
MemoryQueue.props(
etcdClient,
durationChecker,
fqn,
producer,
config,
schedulingConfig,
invocationNamespace,
revision,
schedulerEndpoints,
Expand Down Expand Up @@ -395,3 +397,5 @@ object SchedulerStates extends DefaultJsonProtocol {

def parse(states: String) = Try(serdes.read(states.parseJson))
}

case class SchedulingConfig(staleThreshold: FiniteDuration, checkInterval: FiniteDuration, dropInterval: FiniteDuration)
Original file line number Diff line number Diff line change
Expand Up @@ -251,7 +251,8 @@ class ContainerManager(jobManagerFactory: ActorRefFactory => ActorRef,
// Filter out messages which can use warmed container
private def filterWarmedCreations(msgs: List[ContainerCreationMessage]) = {
msgs.filter { msg =>
val warmedPrefix = containerPrefix(ContainerKeys.warmedPrefix, msg.invocationNamespace, msg.action, Some(msg.revision))
val warmedPrefix =
containerPrefix(ContainerKeys.warmedPrefix, msg.invocationNamespace, msg.action, Some(msg.revision))
val chosenInvoker = warmedContainers
.filter(!inProgressWarmedContainers.values.toSeq.contains(_))
.find { container =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import akka.actor.Status.{Failure => FailureMessage}
import akka.actor.{ActorRef, ActorSystem, Cancellable, FSM, Props, Stash}
import akka.util.Timeout
import org.apache.openwhisk.common._
import org.apache.openwhisk.core.ConfigKeys
import org.apache.openwhisk.core.ack.ActiveAck
import org.apache.openwhisk.core.connector.ContainerCreationError.{TooManyConcurrentRequests, ZeroNamespaceLimit}
import org.apache.openwhisk.core.connector._
Expand All @@ -33,22 +34,15 @@ import org.apache.openwhisk.core.entity.size._
import org.apache.openwhisk.core.etcd.EtcdClient
import org.apache.openwhisk.core.etcd.EtcdKV.ContainerKeys.containerPrefix
import org.apache.openwhisk.core.etcd.EtcdKV.{ContainerKeys, QueueKeys, ThrottlingKeys}
import org.apache.openwhisk.core.scheduler.SchedulerEndpoints
import org.apache.openwhisk.core.scheduler.message.{
ContainerCreation,
ContainerDeletion,
FailedCreationJob,
SuccessfulCreationJob
}
import org.apache.openwhisk.core.scheduler.grpc.{GetActivation, ActivationResponse => GetActivationResponse}
import org.apache.openwhisk.core.scheduler.message.{
ContainerCreation,
ContainerDeletion,
FailedCreationJob,
SuccessfulCreationJob
}
import org.apache.openwhisk.core.scheduler.{SchedulerEndpoints, SchedulingConfig}
import org.apache.openwhisk.core.service._
import org.apache.openwhisk.core.{ConfigKeys, WhiskConfig}
import org.apache.openwhisk.http.Messages.{namespaceLimitUnderZero, tooManyConcurrentRequests}
import pureconfig.generic.auto._
import pureconfig.loadConfigOrThrow
Expand All @@ -59,7 +53,6 @@ import scala.collection.immutable.Queue
import scala.collection.mutable
import scala.concurrent.duration._
import scala.concurrent.{duration, ExecutionContextExecutor, Future, Promise}
import scala.language.postfixOps
import scala.util.{Failure, Success}

// States
Expand Down Expand Up @@ -116,7 +109,7 @@ class MemoryQueue(private val etcdClient: EtcdClient,
private val durationChecker: DurationChecker,
private val action: FullyQualifiedEntityName,
messagingProducer: MessageProducer,
config: WhiskConfig,
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Seems this is no longer being used.

schedulingConfig: SchedulingConfig,
invocationNamespace: String,
revision: DocRevision,
endpoints: SchedulerEndpoints,
Expand Down Expand Up @@ -144,11 +137,8 @@ class MemoryQueue(private val etcdClient: EtcdClient,
private implicit val timeout = Timeout(5.seconds)
private implicit val order: Ordering[BufferedRequest] = Ordering.by(_.containerId)

private val StaleDuration = Duration.ofMillis(schedulingConfig.staleThreshold.toMillis)
private val unversionedAction = action.copy(version = None)
private val checkInterval: FiniteDuration = 100 milliseconds
private val StaleThreshold: Double = 100.0
private val StaleDuration = Duration.ofMillis(StaleThreshold.toLong)
private val dropInterval: FiniteDuration = 10 seconds
private val leaderKey = QueueKeys.queue(invocationNamespace, unversionedAction, leader = true)
private val inProgressContainerPrefixKey =
containerPrefix(ContainerKeys.inProgressPrefix, invocationNamespace, action, Some(revision))
Expand Down Expand Up @@ -834,7 +824,6 @@ class MemoryQueue(private val etcdClient: EtcdClient,
}
}


private def handleStaleActivationsWhenActionUpdated(queueManager: ActorRef): Unit = {
if (queue.size > 0) {
// if doesn't exist old container to pull old memoryQueue's activation, send the old activations to queueManager
Expand Down Expand Up @@ -862,12 +851,12 @@ class MemoryQueue(private val etcdClient: EtcdClient,
// since there is no initial delay, it will try to create a container at initialization time
// these schedulers will run forever and stop when the memory queue stops
private def startMonitoring(): (ActorRef, ActorRef) = {
val droppingScheduler = Scheduler.scheduleWaitAtLeast(dropInterval) { () =>
val droppingScheduler = Scheduler.scheduleWaitAtLeast(schedulingConfig.dropInterval) { () =>
checkToDropStaleActivation(queue, queueConfig.maxRetentionMs, invocationNamespace, action, stateName, self)
Future.successful(())
}

val monitoringScheduler = Scheduler.scheduleWaitAtLeast(checkInterval) { () =>
val monitoringScheduler = Scheduler.scheduleWaitAtLeast(schedulingConfig.checkInterval) { () =>
// the average duration is updated every checkInterval
if (averageDurationBuffer.nonEmpty) {
averageDuration = Some(averageDurationBuffer.average)
Expand Down Expand Up @@ -1048,7 +1037,7 @@ object MemoryQueue {
durationChecker: DurationChecker,
fqn: FullyQualifiedEntityName,
messagingProducer: MessageProducer,
config: WhiskConfig,
schedulingConfig: SchedulingConfig,
invocationNamespace: String,
revision: DocRevision,
endpoints: SchedulerEndpoints,
Expand All @@ -1067,7 +1056,7 @@ object MemoryQueue {
durationChecker,
fqn: FullyQualifiedEntityName,
messagingProducer: MessageProducer,
config: WhiskConfig,
schedulingConfig: SchedulingConfig,
invocationNamespace: String,
revision,
endpoints: SchedulerEndpoints,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,16 +20,19 @@ package org.apache.openwhisk.core.scheduler.queue
import akka.actor.{Actor, ActorSystem, Props}
import org.apache.openwhisk.common.Logging
import org.apache.openwhisk.core.entity.FullyQualifiedEntityName
import org.apache.openwhisk.core.scheduler.SchedulingConfig

import scala.concurrent.{ExecutionContext, Future}
import scala.util.{Failure, Success}

class SchedulingDecisionMaker(
invocationNamespace: String,
action: FullyQualifiedEntityName,
StaleThreshold: Double = 100.0)(implicit val actorSystem: ActorSystem, ec: ExecutionContext, logging: Logging)
schedulingConfig: SchedulingConfig)(implicit val actorSystem: ActorSystem, ec: ExecutionContext, logging: Logging)
extends Actor {

private val staleThreshold: Double = schedulingConfig.staleThreshold.toMillis.toDouble

override def receive: Receive = {
case msg: QueueSnapshot =>
decide(msg)
Expand Down Expand Up @@ -135,7 +138,7 @@ class SchedulingDecisionMaker(

case (Running, Some(duration)) if staleActivationNum > 0 =>
// we can safely get the value as we already checked the existence
val containerThroughput = StaleThreshold / duration
val containerThroughput = staleThreshold / duration
val num = ceiling(availableMsg.toDouble / containerThroughput)
// if it tries to create more containers than existing messages, we just create shortage
val actualNum = (if (num > availableMsg) availableMsg else num) - inProgress
Expand All @@ -153,7 +156,7 @@ class SchedulingDecisionMaker(
// need more containers and a message is already processed
case (Running, Some(duration)) =>
// we can safely get the value as we already checked the existence
val containerThroughput = StaleThreshold / duration
val containerThroughput = staleThreshold / duration
val expectedTps = containerThroughput * (existing + inProgress)

if (availableMsg >= expectedTps && existing + inProgress < availableMsg) {
Expand All @@ -180,7 +183,7 @@ class SchedulingDecisionMaker(
// this case is for that as a last resort.
case (Removing, Some(duration)) if staleActivationNum > 0 =>
// we can safely get the value as we already checked the existence
val containerThroughput = StaleThreshold / duration
val containerThroughput = staleThreshold / duration
val num = ceiling(availableMsg.toDouble / containerThroughput)
// if it tries to create more containers than existing messages, we just create shortage
val actualNum = (if (num > availableMsg) availableMsg else num) - inProgress
Expand Down Expand Up @@ -252,10 +255,10 @@ class SchedulingDecisionMaker(
}

object SchedulingDecisionMaker {
def props(invocationNamespace: String, action: FullyQualifiedEntityName, StaleThreshold: Double = 100.0)(
def props(invocationNamespace: String, action: FullyQualifiedEntityName, schedulingConfig: SchedulingConfig)(
implicit actorSystem: ActorSystem,
ec: ExecutionContext,
logging: Logging): Props = {
Props(new SchedulingDecisionMaker(invocationNamespace, action, StaleThreshold))
Props(new SchedulingDecisionMaker(invocationNamespace, action, schedulingConfig))
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ import org.testcontainers.containers.MongoDBContainer
import pureconfig.loadConfigOrThrow
import pureconfig.generic.auto._

import scala.reflect.{ClassTag, classTag}
import scala.reflect.{classTag, ClassTag}

trait MongoDBStoreBehaviorBase extends FlatSpec with ArtifactStoreBehaviorBase {
val imageName = loadConfigOrThrow[String]("whisk.mongodb.docker-image")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -83,9 +83,11 @@ class DefaultInvokerServerTests
val validCredentials = BasicHttpCredentials(systemUsername, systemPassword)
Get(s"/isEnabled") ~> addCredentials(validCredentials) ~> Route.seal(server.routes(tid)) ~> check {
status should be(OK)
Unmarshal(responseEntity).to[String].map(response => {
InvokerEnabled.parseJson(response) shouldEqual InvokerEnabled(true)
})
Unmarshal(responseEntity)
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am unclear why scalaFmt suddenly requires this kind of change.

.to[String]
.map(response => {
InvokerEnabled.parseJson(response) shouldEqual InvokerEnabled(true)
})
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -83,9 +83,11 @@ class FPCInvokerServerTests
val validCredentials = BasicHttpCredentials(systemUsername, systemPassword)
Get(s"/isEnabled") ~> addCredentials(validCredentials) ~> Route.seal(server.routes(tid)) ~> check {
status should be(OK)
Unmarshal(responseEntity).to[String].map(response => {
InvokerEnabled.parseJson(response) shouldEqual InvokerEnabled(true)
})
Unmarshal(responseEntity)
.to[String]
.map(response => {
InvokerEnabled.parseJson(response) shouldEqual InvokerEnabled(true)
})
}
}

Expand Down
Loading