Skip to content

Commit 32ceac6

Browse files
style95JesseStutler
authored andcommitted
Introduce scheduling configurations. (apache#5232)
* Introduce scheduling configurations. * Apply SchedulingConfig to MemoryQueue. * Apply SchedulingConfig to SchedulingDecisionMaker. * Apply ScalaFmt * Remove unused import * Change configs. * Fix test cases. * Apply scalaFmt * Remove Java8-compat dependency.
1 parent 3c6afd9 commit 32ceac6

File tree

17 files changed

+159
-123
lines changed

17 files changed

+159
-123
lines changed

ansible/group_vars/all

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -515,6 +515,10 @@ scheduler:
515515
inProgressJobRetention: "{{ scheduler_inProgressJobRetention | default('20 seconds') }}"
516516
managedFraction: "{{ scheduler_managed_fraction | default(1.0 - (scheduler_blackbox_fraction | default(__scheduler_blackbox_fraction))) }}"
517517
blackboxFraction: "{{ scheduler_blackbox_fraction | default(__scheduler_blackbox_fraction) }}"
518+
scheduling:
519+
staleThreshold: "{{ scheduler_scheduling_staleThreshold | default('100 milliseconds') }}"
520+
checkInterval: "{{ scheduler_scheduling_checkInterval | default('100 milliseconds') }}"
521+
dropInterval: "{{ scheduler_scheduling_dropInterval | default('10 seconds') }}"
518522
queueManager:
519523
maxSchedulingTime: "{{ scheduler_maxSchedulingTime | default('20 second') }}"
520524
maxRetriesToGetQueue: "{{ scheduler_maxRetriesToGetQueue | default(13) }}"

ansible/roles/schedulers/tasks/deploy.yml

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -113,6 +113,9 @@
113113
"CONFIG_whisk_scheduler_maxPeek": "{{ scheduler.maxPeek }}"
114114
"CONFIG_whisk_scheduler_dataManagementService_retryInterval": "{{ scheduler.dataManagementService.retryInterval }}"
115115
"CONFIG_whisk_scheduler_inProgressJobRetention": "{{ scheduler.inProgressJobRetention }}"
116+
"CONFIG_whisk_scheduler_scheduling_staleThreshold": "{{ scheduler.scheduling.staleThreshold }}"
117+
"CONFIG_whisk_scheduler_scheduling_checkInterval": "{{ scheduler.scheduling.checkInterval }}"
118+
"CONFIG_whisk_scheduler_scheduling_dropInterval": "{{ scheduler.scheduling.dropInterval }}"
116119
"CONFIG_whisk_scheduler_queueManager_maxSchedulingTime": "{{ scheduler.queueManager.maxSchedulingTime }}"
117120
"CONFIG_whisk_scheduler_queueManager_maxRetriesToGetQueue": "{{ scheduler.queueManager.maxRetriesToGetQueue }}"
118121
"CONFIG_whisk_scheduler_queue_idleGrace": "{{ scheduler.queue.idleGrace }}"

common/scala/src/main/scala/org/apache/openwhisk/core/WhiskConfig.scala

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -301,9 +301,11 @@ object ConfigKeys {
301301

302302
val schedulerGrpcService = "whisk.scheduler.grpc"
303303
val schedulerMaxPeek = "whisk.scheduler.max-peek"
304+
val schedulerScheduling = "whisk.scheduler.scheduling"
304305
val schedulerQueue = "whisk.scheduler.queue"
305306
val schedulerQueueManager = "whisk.scheduler.queue-manager"
306307
val schedulerInProgressJobRetention = "whisk.scheduler.in-progress-job-retention"
308+
val schedulerStaleThreshold = "whisk.scheduler.stale-threshold"
307309

308310
val whiskClusterName = "whisk.cluster.name"
309311

common/scala/src/main/scala/org/apache/openwhisk/core/database/memory/NoopActivationStore.scala

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,12 @@ package org.apache.openwhisk.core.database.memory
2020
import java.time.Instant
2121
import akka.actor.ActorSystem
2222
import org.apache.openwhisk.common.{Logging, PrintStreamLogging, TransactionId, WhiskInstants}
23-
import org.apache.openwhisk.core.database.{ActivationStore, ActivationStoreProvider, CacheChangeNotification, UserContext}
23+
import org.apache.openwhisk.core.database.{
24+
ActivationStore,
25+
ActivationStoreProvider,
26+
CacheChangeNotification,
27+
UserContext
28+
}
2429
import org.apache.openwhisk.core.entity.{ActivationId, DocInfo, EntityName, EntityPath, Subject, WhiskActivation}
2530
import spray.json.{JsNumber, JsObject}
2631

core/controller/src/main/scala/org/apache/openwhisk/core/loadBalancer/FPCPoolBalancer.scala

Lines changed: 9 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -639,17 +639,16 @@ class FPCPoolBalancer(config: WhiskConfig,
639639
MetricEmitter.emitGaugeMetric(UNHEALTHY_INVOKERS, invokers.count(_.status == Unhealthy))
640640
MetricEmitter.emitGaugeMetric(OFFLINE_INVOKERS, invokers.count(_.status == Offline))
641641
// Add both user memory and busy memory because user memory represents free memory in this case
642-
MetricEmitter.emitGaugeMetric(
643-
INVOKER_TOTALMEM,
644-
invokers.foldLeft(0L) { (total, curr) =>
645-
if (curr.status.isUsable) {
646-
curr.id.userMemory.toMB + curr.id.busyMemory.getOrElse(ByteSize(0, SizeUnits.BYTE)).toMB + total
647-
} else {
648-
total
649-
}
650-
})
642+
MetricEmitter.emitGaugeMetric(INVOKER_TOTALMEM, invokers.foldLeft(0L) { (total, curr) =>
643+
if (curr.status.isUsable) {
644+
curr.id.userMemory.toMB + curr.id.busyMemory.getOrElse(ByteSize(0, SizeUnits.BYTE)).toMB + total
645+
} else {
646+
total
647+
}
648+
})
651649
MetricEmitter.emitGaugeMetric(LOADBALANCER_ACTIVATIONS_INFLIGHT(controllerInstance), totalActivations.longValue)
652-
MetricEmitter.emitGaugeMetric(LOADBALANCER_MEMORY_INFLIGHT(controllerInstance, ""), totalActivationMemory.longValue)
650+
MetricEmitter
651+
.emitGaugeMetric(LOADBALANCER_MEMORY_INFLIGHT(controllerInstance, ""), totalActivationMemory.longValue)
653652
})
654653
}
655654

core/scheduler/src/main/resources/application.conf

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -64,6 +64,11 @@ whisk {
6464
grpc {
6565
tls = "false"
6666
}
67+
scheduling {
68+
stale-threshold = "100 milliseconds"
69+
check-interval = "100 milliseconds"
70+
drop-interval = "10 seconds"
71+
}
6772
queue {
6873
idle-grace = "20 seconds"
6974
stop-grace = "20 seconds"

core/scheduler/src/main/scala/org/apache/openwhisk/core/scheduler/Scheduler.scala

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -70,6 +70,8 @@ class Scheduler(schedulerId: SchedulerInstanceId, schedulerEndpoints: SchedulerE
7070
val leaseService =
7171
actorSystem.actorOf(LeaseKeepAliveService.props(etcdClient, schedulerId, watcherService))
7272

73+
val schedulingConfig = loadConfigOrThrow[SchedulingConfig](ConfigKeys.schedulerQueue)
74+
7375
implicit val entityStore = WhiskEntityStore.datastore()
7476
private val activationStore =
7577
SpiLoader.get[ActivationStoreProvider].instance(actorSystem, logging)
@@ -191,15 +193,15 @@ class Scheduler(schedulerId: SchedulerInstanceId, schedulerEndpoints: SchedulerE
191193
: (ActorRefFactory, String, FullyQualifiedEntityName, DocRevision, WhiskActionMetaData) => ActorRef =
192194
(factory, invocationNamespace, fqn, revision, actionMetaData) => {
193195
// Todo: Change this to SPI
194-
val decisionMaker = factory.actorOf(SchedulingDecisionMaker.props(invocationNamespace, fqn))
196+
val decisionMaker = factory.actorOf(SchedulingDecisionMaker.props(invocationNamespace, fqn, schedulingConfig))
195197

196198
factory.actorOf(
197199
MemoryQueue.props(
198200
etcdClient,
199201
durationChecker,
200202
fqn,
201203
producer,
202-
config,
204+
schedulingConfig,
203205
invocationNamespace,
204206
revision,
205207
schedulerEndpoints,
@@ -395,3 +397,5 @@ object SchedulerStates extends DefaultJsonProtocol {
395397

396398
def parse(states: String) = Try(serdes.read(states.parseJson))
397399
}
400+
401+
case class SchedulingConfig(staleThreshold: FiniteDuration, checkInterval: FiniteDuration, dropInterval: FiniteDuration)

core/scheduler/src/main/scala/org/apache/openwhisk/core/scheduler/container/ContainerManager.scala

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -251,7 +251,8 @@ class ContainerManager(jobManagerFactory: ActorRefFactory => ActorRef,
251251
// Filter out messages which can use warmed container
252252
private def filterWarmedCreations(msgs: List[ContainerCreationMessage]) = {
253253
msgs.filter { msg =>
254-
val warmedPrefix = containerPrefix(ContainerKeys.warmedPrefix, msg.invocationNamespace, msg.action, Some(msg.revision))
254+
val warmedPrefix =
255+
containerPrefix(ContainerKeys.warmedPrefix, msg.invocationNamespace, msg.action, Some(msg.revision))
255256
val chosenInvoker = warmedContainers
256257
.filter(!inProgressWarmedContainers.values.toSeq.contains(_))
257258
.find { container =>

core/scheduler/src/main/scala/org/apache/openwhisk/core/scheduler/queue/MemoryQueue.scala

Lines changed: 8 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@ import akka.actor.Status.{Failure => FailureMessage}
2424
import akka.actor.{ActorRef, ActorSystem, Cancellable, FSM, Props, Stash}
2525
import akka.util.Timeout
2626
import org.apache.openwhisk.common._
27+
import org.apache.openwhisk.core.ConfigKeys
2728
import org.apache.openwhisk.core.ack.ActiveAck
2829
import org.apache.openwhisk.core.connector.ContainerCreationError.{TooManyConcurrentRequests, ZeroNamespaceLimit}
2930
import org.apache.openwhisk.core.connector._
@@ -33,22 +34,15 @@ import org.apache.openwhisk.core.entity.size._
3334
import org.apache.openwhisk.core.etcd.EtcdClient
3435
import org.apache.openwhisk.core.etcd.EtcdKV.ContainerKeys.containerPrefix
3536
import org.apache.openwhisk.core.etcd.EtcdKV.{ContainerKeys, QueueKeys, ThrottlingKeys}
36-
import org.apache.openwhisk.core.scheduler.SchedulerEndpoints
37-
import org.apache.openwhisk.core.scheduler.message.{
38-
ContainerCreation,
39-
ContainerDeletion,
40-
FailedCreationJob,
41-
SuccessfulCreationJob
42-
}
4337
import org.apache.openwhisk.core.scheduler.grpc.{GetActivation, ActivationResponse => GetActivationResponse}
4438
import org.apache.openwhisk.core.scheduler.message.{
4539
ContainerCreation,
4640
ContainerDeletion,
4741
FailedCreationJob,
4842
SuccessfulCreationJob
4943
}
44+
import org.apache.openwhisk.core.scheduler.{SchedulerEndpoints, SchedulingConfig}
5045
import org.apache.openwhisk.core.service._
51-
import org.apache.openwhisk.core.{ConfigKeys, WhiskConfig}
5246
import org.apache.openwhisk.http.Messages.{namespaceLimitUnderZero, tooManyConcurrentRequests}
5347
import pureconfig.generic.auto._
5448
import pureconfig.loadConfigOrThrow
@@ -59,7 +53,6 @@ import scala.collection.immutable.Queue
5953
import scala.collection.mutable
6054
import scala.concurrent.duration._
6155
import scala.concurrent.{duration, ExecutionContextExecutor, Future, Promise}
62-
import scala.language.postfixOps
6356
import scala.util.{Failure, Success}
6457

6558
// States
@@ -116,7 +109,7 @@ class MemoryQueue(private val etcdClient: EtcdClient,
116109
private val durationChecker: DurationChecker,
117110
private val action: FullyQualifiedEntityName,
118111
messagingProducer: MessageProducer,
119-
config: WhiskConfig,
112+
schedulingConfig: SchedulingConfig,
120113
invocationNamespace: String,
121114
revision: DocRevision,
122115
endpoints: SchedulerEndpoints,
@@ -144,11 +137,8 @@ class MemoryQueue(private val etcdClient: EtcdClient,
144137
private implicit val timeout = Timeout(5.seconds)
145138
private implicit val order: Ordering[BufferedRequest] = Ordering.by(_.containerId)
146139

140+
private val StaleDuration = Duration.ofMillis(schedulingConfig.staleThreshold.toMillis)
147141
private val unversionedAction = action.copy(version = None)
148-
private val checkInterval: FiniteDuration = 100 milliseconds
149-
private val StaleThreshold: Double = 100.0
150-
private val StaleDuration = Duration.ofMillis(StaleThreshold.toLong)
151-
private val dropInterval: FiniteDuration = 10 seconds
152142
private val leaderKey = QueueKeys.queue(invocationNamespace, unversionedAction, leader = true)
153143
private val inProgressContainerPrefixKey =
154144
containerPrefix(ContainerKeys.inProgressPrefix, invocationNamespace, action, Some(revision))
@@ -834,7 +824,6 @@ class MemoryQueue(private val etcdClient: EtcdClient,
834824
}
835825
}
836826

837-
838827
private def handleStaleActivationsWhenActionUpdated(queueManager: ActorRef): Unit = {
839828
if (queue.size > 0) {
840829
// if doesn't exist old container to pull old memoryQueue's activation, send the old activations to queueManager
@@ -862,12 +851,12 @@ class MemoryQueue(private val etcdClient: EtcdClient,
862851
// since there is no initial delay, it will try to create a container at initialization time
863852
// these schedulers will run forever and stop when the memory queue stops
864853
private def startMonitoring(): (ActorRef, ActorRef) = {
865-
val droppingScheduler = Scheduler.scheduleWaitAtLeast(dropInterval) { () =>
854+
val droppingScheduler = Scheduler.scheduleWaitAtLeast(schedulingConfig.dropInterval) { () =>
866855
checkToDropStaleActivation(queue, queueConfig.maxRetentionMs, invocationNamespace, action, stateName, self)
867856
Future.successful(())
868857
}
869858

870-
val monitoringScheduler = Scheduler.scheduleWaitAtLeast(checkInterval) { () =>
859+
val monitoringScheduler = Scheduler.scheduleWaitAtLeast(schedulingConfig.checkInterval) { () =>
871860
// the average duration is updated every checkInterval
872861
if (averageDurationBuffer.nonEmpty) {
873862
averageDuration = Some(averageDurationBuffer.average)
@@ -1048,7 +1037,7 @@ object MemoryQueue {
10481037
durationChecker: DurationChecker,
10491038
fqn: FullyQualifiedEntityName,
10501039
messagingProducer: MessageProducer,
1051-
config: WhiskConfig,
1040+
schedulingConfig: SchedulingConfig,
10521041
invocationNamespace: String,
10531042
revision: DocRevision,
10541043
endpoints: SchedulerEndpoints,
@@ -1067,7 +1056,7 @@ object MemoryQueue {
10671056
durationChecker,
10681057
fqn: FullyQualifiedEntityName,
10691058
messagingProducer: MessageProducer,
1070-
config: WhiskConfig,
1059+
schedulingConfig: SchedulingConfig,
10711060
invocationNamespace: String,
10721061
revision,
10731062
endpoints: SchedulerEndpoints,

core/scheduler/src/main/scala/org/apache/openwhisk/core/scheduler/queue/SchedulingDecisionMaker.scala

Lines changed: 9 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -20,16 +20,19 @@ package org.apache.openwhisk.core.scheduler.queue
2020
import akka.actor.{Actor, ActorSystem, Props}
2121
import org.apache.openwhisk.common.Logging
2222
import org.apache.openwhisk.core.entity.FullyQualifiedEntityName
23+
import org.apache.openwhisk.core.scheduler.SchedulingConfig
2324

2425
import scala.concurrent.{ExecutionContext, Future}
2526
import scala.util.{Failure, Success}
2627

2728
class SchedulingDecisionMaker(
2829
invocationNamespace: String,
2930
action: FullyQualifiedEntityName,
30-
StaleThreshold: Double = 100.0)(implicit val actorSystem: ActorSystem, ec: ExecutionContext, logging: Logging)
31+
schedulingConfig: SchedulingConfig)(implicit val actorSystem: ActorSystem, ec: ExecutionContext, logging: Logging)
3132
extends Actor {
3233

34+
private val staleThreshold: Double = schedulingConfig.staleThreshold.toMillis.toDouble
35+
3336
override def receive: Receive = {
3437
case msg: QueueSnapshot =>
3538
decide(msg)
@@ -135,7 +138,7 @@ class SchedulingDecisionMaker(
135138

136139
case (Running, Some(duration)) if staleActivationNum > 0 =>
137140
// we can safely get the value as we already checked the existence
138-
val containerThroughput = StaleThreshold / duration
141+
val containerThroughput = staleThreshold / duration
139142
val num = ceiling(availableMsg.toDouble / containerThroughput)
140143
// if it tries to create more containers than existing messages, we just create shortage
141144
val actualNum = (if (num > availableMsg) availableMsg else num) - inProgress
@@ -153,7 +156,7 @@ class SchedulingDecisionMaker(
153156
// need more containers and a message is already processed
154157
case (Running, Some(duration)) =>
155158
// we can safely get the value as we already checked the existence
156-
val containerThroughput = StaleThreshold / duration
159+
val containerThroughput = staleThreshold / duration
157160
val expectedTps = containerThroughput * (existing + inProgress)
158161

159162
if (availableMsg >= expectedTps && existing + inProgress < availableMsg) {
@@ -180,7 +183,7 @@ class SchedulingDecisionMaker(
180183
// this case is for that as a last resort.
181184
case (Removing, Some(duration)) if staleActivationNum > 0 =>
182185
// we can safely get the value as we already checked the existence
183-
val containerThroughput = StaleThreshold / duration
186+
val containerThroughput = staleThreshold / duration
184187
val num = ceiling(availableMsg.toDouble / containerThroughput)
185188
// if it tries to create more containers than existing messages, we just create shortage
186189
val actualNum = (if (num > availableMsg) availableMsg else num) - inProgress
@@ -252,10 +255,10 @@ class SchedulingDecisionMaker(
252255
}
253256

254257
object SchedulingDecisionMaker {
255-
def props(invocationNamespace: String, action: FullyQualifiedEntityName, StaleThreshold: Double = 100.0)(
258+
def props(invocationNamespace: String, action: FullyQualifiedEntityName, schedulingConfig: SchedulingConfig)(
256259
implicit actorSystem: ActorSystem,
257260
ec: ExecutionContext,
258261
logging: Logging): Props = {
259-
Props(new SchedulingDecisionMaker(invocationNamespace, action, StaleThreshold))
262+
Props(new SchedulingDecisionMaker(invocationNamespace, action, schedulingConfig))
260263
}
261264
}

0 commit comments

Comments
 (0)