-
Notifications
You must be signed in to change notification settings - Fork 1.2k
Introduce scheduling configurations. #5232
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
3b50920
e249318
e848130
63a3f0a
1584d97
cbab6ce
2e585c5
2e82d87
3197612
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -64,6 +64,11 @@ whisk { | |
grpc { | ||
tls = "false" | ||
} | ||
scheduling { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. since this is already under There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. These configurations are related to both MemoryQueue and DecisionMaker. |
||
stale-threshold = "100 milliseconds" | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Because the value is There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I just thought its value includes the unit so we can easily figure out its type. |
||
check-interval = "100 milliseconds" | ||
drop-interval = "10 seconds" | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This is another config I forgot to mention we'll need to tune so good to see it configurable here. Does 10 seconds make sense as the default if the built in max action duration is 60 seconds? If the namespace or action is throttled and can't create more containers, then may be waiting up to 60 seconds to process new activations. On the flip side I think this is used to be able to stop the scheduler once all queues are drained before stopping so I also understand why you would want this to be on the lower side There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The drop interval is to periodically check whether there are any stale activations. The retention time is controlled by the queue retention time which is 60s by default. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I see okay. Maybe the max retention ms default should be the same as the default completion ack timeout which I think is three minutes based on the timeout calculation in code There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. ok, so do you want me to update the default retention time to 3 mins? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Yea I think that would be best unless you see a reason why it should be 1 min since you're more familiar with the architecture |
||
} | ||
queue { | ||
idle-grace = "20 seconds" | ||
stop-grace = "20 seconds" | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -24,6 +24,7 @@ import akka.actor.Status.{Failure => FailureMessage} | |
import akka.actor.{ActorRef, ActorSystem, Cancellable, FSM, Props, Stash} | ||
import akka.util.Timeout | ||
import org.apache.openwhisk.common._ | ||
import org.apache.openwhisk.core.ConfigKeys | ||
import org.apache.openwhisk.core.ack.ActiveAck | ||
import org.apache.openwhisk.core.connector.ContainerCreationError.{TooManyConcurrentRequests, ZeroNamespaceLimit} | ||
import org.apache.openwhisk.core.connector._ | ||
|
@@ -33,22 +34,15 @@ import org.apache.openwhisk.core.entity.size._ | |
import org.apache.openwhisk.core.etcd.EtcdClient | ||
import org.apache.openwhisk.core.etcd.EtcdKV.ContainerKeys.containerPrefix | ||
import org.apache.openwhisk.core.etcd.EtcdKV.{ContainerKeys, QueueKeys, ThrottlingKeys} | ||
import org.apache.openwhisk.core.scheduler.SchedulerEndpoints | ||
import org.apache.openwhisk.core.scheduler.message.{ | ||
ContainerCreation, | ||
ContainerDeletion, | ||
FailedCreationJob, | ||
SuccessfulCreationJob | ||
} | ||
import org.apache.openwhisk.core.scheduler.grpc.{GetActivation, ActivationResponse => GetActivationResponse} | ||
import org.apache.openwhisk.core.scheduler.message.{ | ||
ContainerCreation, | ||
ContainerDeletion, | ||
FailedCreationJob, | ||
SuccessfulCreationJob | ||
} | ||
import org.apache.openwhisk.core.scheduler.{SchedulerEndpoints, SchedulingConfig} | ||
import org.apache.openwhisk.core.service._ | ||
import org.apache.openwhisk.core.{ConfigKeys, WhiskConfig} | ||
import org.apache.openwhisk.http.Messages.{namespaceLimitUnderZero, tooManyConcurrentRequests} | ||
import pureconfig.generic.auto._ | ||
import pureconfig.loadConfigOrThrow | ||
|
@@ -59,7 +53,6 @@ import scala.collection.immutable.Queue | |
import scala.collection.mutable | ||
import scala.concurrent.duration._ | ||
import scala.concurrent.{duration, ExecutionContextExecutor, Future, Promise} | ||
import scala.language.postfixOps | ||
import scala.util.{Failure, Success} | ||
|
||
// States | ||
|
@@ -116,7 +109,7 @@ class MemoryQueue(private val etcdClient: EtcdClient, | |
private val durationChecker: DurationChecker, | ||
private val action: FullyQualifiedEntityName, | ||
messagingProducer: MessageProducer, | ||
config: WhiskConfig, | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Seems this is no longer being used. |
||
schedulingConfig: SchedulingConfig, | ||
invocationNamespace: String, | ||
revision: DocRevision, | ||
endpoints: SchedulerEndpoints, | ||
|
@@ -144,11 +137,8 @@ class MemoryQueue(private val etcdClient: EtcdClient, | |
private implicit val timeout = Timeout(5.seconds) | ||
private implicit val order: Ordering[BufferedRequest] = Ordering.by(_.containerId) | ||
|
||
private val StaleDuration = Duration.ofMillis(schedulingConfig.staleThreshold.toMillis) | ||
private val unversionedAction = action.copy(version = None) | ||
private val checkInterval: FiniteDuration = 100 milliseconds | ||
private val StaleThreshold: Double = 100.0 | ||
private val StaleDuration = Duration.ofMillis(StaleThreshold.toLong) | ||
private val dropInterval: FiniteDuration = 10 seconds | ||
private val leaderKey = QueueKeys.queue(invocationNamespace, unversionedAction, leader = true) | ||
private val inProgressContainerPrefixKey = | ||
containerPrefix(ContainerKeys.inProgressPrefix, invocationNamespace, action, Some(revision)) | ||
|
@@ -834,7 +824,6 @@ class MemoryQueue(private val etcdClient: EtcdClient, | |
} | ||
} | ||
|
||
|
||
private def handleStaleActivationsWhenActionUpdated(queueManager: ActorRef): Unit = { | ||
if (queue.size > 0) { | ||
// if doesn't exist old container to pull old memoryQueue's activation, send the old activations to queueManager | ||
|
@@ -862,12 +851,12 @@ class MemoryQueue(private val etcdClient: EtcdClient, | |
// since there is no initial delay, it will try to create a container at initialization time | ||
// these schedulers will run forever and stop when the memory queue stops | ||
private def startMonitoring(): (ActorRef, ActorRef) = { | ||
val droppingScheduler = Scheduler.scheduleWaitAtLeast(dropInterval) { () => | ||
val droppingScheduler = Scheduler.scheduleWaitAtLeast(schedulingConfig.dropInterval) { () => | ||
checkToDropStaleActivation(queue, queueConfig.maxRetentionMs, invocationNamespace, action, stateName, self) | ||
Future.successful(()) | ||
} | ||
|
||
val monitoringScheduler = Scheduler.scheduleWaitAtLeast(checkInterval) { () => | ||
val monitoringScheduler = Scheduler.scheduleWaitAtLeast(schedulingConfig.checkInterval) { () => | ||
// the average duration is updated every checkInterval | ||
if (averageDurationBuffer.nonEmpty) { | ||
averageDuration = Some(averageDurationBuffer.average) | ||
|
@@ -1048,7 +1037,7 @@ object MemoryQueue { | |
durationChecker: DurationChecker, | ||
fqn: FullyQualifiedEntityName, | ||
messagingProducer: MessageProducer, | ||
config: WhiskConfig, | ||
schedulingConfig: SchedulingConfig, | ||
invocationNamespace: String, | ||
revision: DocRevision, | ||
endpoints: SchedulerEndpoints, | ||
|
@@ -1067,7 +1056,7 @@ object MemoryQueue { | |
durationChecker, | ||
fqn: FullyQualifiedEntityName, | ||
messagingProducer: MessageProducer, | ||
config: WhiskConfig, | ||
schedulingConfig: SchedulingConfig, | ||
invocationNamespace: String, | ||
revision, | ||
endpoints: SchedulerEndpoints, | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -83,9 +83,11 @@ class DefaultInvokerServerTests | |
val validCredentials = BasicHttpCredentials(systemUsername, systemPassword) | ||
Get(s"/isEnabled") ~> addCredentials(validCredentials) ~> Route.seal(server.routes(tid)) ~> check { | ||
status should be(OK) | ||
Unmarshal(responseEntity).to[String].map(response => { | ||
InvokerEnabled.parseJson(response) shouldEqual InvokerEnabled(true) | ||
}) | ||
Unmarshal(responseEntity) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I am unclear why scalaFmt suddenly requires this kind of change. |
||
.to[String] | ||
.map(response => { | ||
InvokerEnabled.parseJson(response) shouldEqual InvokerEnabled(true) | ||
}) | ||
} | ||
} | ||
|
||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
How about
dropStaleActivationTime
for this config? Or is the config here the interval at which to check whether there are stale activations to dropThere was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is just an interval to check to see if there are any stale activations.