-
Notifications
You must be signed in to change notification settings - Fork 1.2k
Enable the scheduler by default #5463
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
7915509
781ee24
992c05c
b4b3fde
935e673
ba331e0
8a15023
d169032
5355cb5
60cb23f
97632ac
842c7f3
bcf8d2f
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -65,6 +65,8 @@ jobs: | |
sudo rm -rf "$AGENT_TOOLSDIRECTORY" | ||
- name: Check free space | ||
run: df -h | ||
- name: Disable the scheduler | ||
run: "./tools/github/disable-scheduler.sh" | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The standalone mode is only supported with ShardingPoolBalancer, so we need to disable the scheduler for standalone tests. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. No objections to the change, but do you think the standalone mode could be supported with the new scheduler or should we be looking to deprecate standalone mode? Long term, I'd think we would want to consolidate down to a single scheduler (the new one) to simplify maintenance. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Yes, that would be great to discuss and we can handle it in another issue if necessary. I am still unclear if we can support the standalone mode with the scheduler or even if it would be efficient to enable the scheduler in the standalone mode. |
||
- id: tests | ||
name: Run Tests | ||
run: "./tools/github/run${{ env.TEST_SUITE }}Tests.sh" | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -195,12 +195,13 @@ Set the value of pause-grace to 10s by default | |
. | ||
``` | ||
|
||
#### Enable the scheduler | ||
- Make sure you enable the scheduler by configuring `scheduler_enable`. | ||
#### Disable the scheduler | ||
- You can disable the scheduler by configuring `scheduler_enable`. | ||
- The scheduler is enabled by default. | ||
|
||
**ansible/environments/local/group_vars/all** | ||
```yaml | ||
scheduler_enable: true | ||
scheduler_enable: false | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Now, the scheduler is enabled by default. |
||
``` | ||
|
||
#### [Optional] Enable ElasticSearch Activation Store | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -23,12 +23,12 @@ whisk.spi { | |
MessagingProvider = org.apache.openwhisk.connector.kafka.KafkaMessagingProvider | ||
ContainerFactoryProvider = org.apache.openwhisk.core.containerpool.docker.DockerContainerFactoryProvider | ||
LogStoreProvider = org.apache.openwhisk.core.containerpool.logging.DockerToActivationLogStoreProvider | ||
LoadBalancerProvider = org.apache.openwhisk.core.loadBalancer.ShardingContainerPoolBalancer | ||
EntitlementSpiProvider = org.apache.openwhisk.core.entitlement.LocalEntitlementProvider | ||
LoadBalancerProvider = org.apache.openwhisk.core.loadBalancer.FPCPoolBalancer | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Scheduler-related SPI providers are used by default. |
||
EntitlementSpiProvider = org.apache.openwhisk.core.entitlement.FPCEntitlementProvider | ||
AuthenticationDirectiveProvider = org.apache.openwhisk.core.controller.BasicAuthenticationDirective | ||
InvokerProvider = org.apache.openwhisk.core.invoker.InvokerReactive | ||
InvokerServerProvider = org.apache.openwhisk.core.invoker.DefaultInvokerServer | ||
DurationCheckerProvider = org.apache.openwhisk.core.scheduler.queue.NoopDurationCheckerProvider | ||
InvokerProvider = org.apache.openwhisk.core.invoker.FPCInvokerReactive | ||
InvokerServerProvider = org.apache.openwhisk.core.invoker.FPCInvokerServer | ||
DurationCheckerProvider = org.apache.openwhisk.core.scheduler.queue.ElasticSearchDurationCheckerProvider | ||
} | ||
|
||
dispatchers { | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -31,7 +31,7 @@ whisk { | |
timeout-addon = 1m | ||
|
||
fpc { | ||
use-per-min-throttles = false | ||
use-per-min-throttles = true | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This is required to pass per-minute throttling tests. |
||
} | ||
} | ||
controller { | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -229,68 +229,6 @@ class ThrottleTests | |
settleThrottles(alreadyWaited) | ||
} | ||
} | ||
|
||
it should "throttle 'concurrent' activations of one action" in withAssetCleaner(wskprops) { (wp, assetHelper) => | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This test assumes the ShardingPoolBalancer is used. The sharding pool balancer counts the number of activations on the fly and throttles requests based on the concurrent invocation limit. Please refer to this comment: https://github.com/apache/openwhisk/blob/master/core/controller/src/main/scala/org/apache/openwhisk/core/loadBalancer/FPCPoolBalancer.scala#L683 In this regard, I removed this test case. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. @style95 is there an alternate test that validates the behavior described int the comments? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. It's not the same, but there are two related tests. And MemoryQueue populates these throttling flags with its state transition. There are also some other tests related to throttling like |
||
val name = "checkConcurrentActionThrottle" | ||
assetHelper.withCleaner(wsk.action, name) { | ||
val timeoutAction = Some(TestUtils.getTestActionFilename("sleep.js")) | ||
(action, _) => | ||
action.create(name, timeoutAction) | ||
} | ||
|
||
// The sleep is necessary as the load balancer currently has a latency before recognizing concurrency. | ||
val sleep = 15.seconds | ||
// Adding a bit of overcommit since some loadbalancers rely on some overcommit. This won't hurt those who don't | ||
// since all activations are taken into account to check for throttled invokes below. | ||
val slowInvokes = (maximumConcurrentInvokes * 1.2).toInt | ||
val fastInvokes = 4 | ||
val fastInvokeDuration = 4.seconds | ||
val slowInvokeDuration = sleep + fastInvokeDuration | ||
|
||
// These invokes will stay active long enough that all are issued and load balancer has recognized concurrency. | ||
val startSlowInvokes = Instant.now | ||
val slowResults = untilThrottled(slowInvokes) { () => | ||
wsk.action.invoke( | ||
name, | ||
Map("sleepTimeInMs" -> slowInvokeDuration.toMillis.toJson), | ||
expectedExitCode = DONTCARE_EXIT) | ||
} | ||
val afterSlowInvokes = Instant.now | ||
val slowIssueDuration = durationBetween(startSlowInvokes, afterSlowInvokes) | ||
println( | ||
s"$slowInvokes slow invokes (dur = ${slowInvokeDuration.toSeconds} sec) took ${slowIssueDuration.toSeconds} seconds to issue") | ||
|
||
// Sleep to let the background thread get the newest values (refreshes every 2 seconds) | ||
println(s"Sleeping for ${sleep.toSeconds} sec") | ||
Thread.sleep(sleep.toMillis) | ||
|
||
// These fast invokes will trigger the concurrency-based throttling. | ||
val startFastInvokes = Instant.now | ||
val fastResults = untilThrottled(fastInvokes) { () => | ||
wsk.action.invoke( | ||
name, | ||
Map("sleepTimeInMs" -> fastInvokeDuration.toMillis.toJson), | ||
expectedExitCode = DONTCARE_EXIT) | ||
} | ||
val afterFastInvokes = Instant.now | ||
val fastIssueDuration = durationBetween(afterFastInvokes, startFastInvokes) | ||
println( | ||
s"$fastInvokes fast invokes (dur = ${fastInvokeDuration.toSeconds} sec) took ${fastIssueDuration.toSeconds} seconds to issue") | ||
|
||
val combinedResults = slowResults ++ fastResults | ||
try { | ||
val throttledCount = throttledActivations(combinedResults, tooManyConcurrentRequests(0, 0)) | ||
throttledCount should be > 0 | ||
} finally { | ||
val alreadyWaited = durationBetween(afterSlowInvokes, Instant.now) | ||
settleThrottles(alreadyWaited) | ||
println("clearing activations") | ||
} | ||
// wait for the activations last, giving the activations time to complete and | ||
// may avoid unnecessarily polling; if these fail, the throttle may not be settled | ||
println("waiting for activations to complete") | ||
waitForActivations(combinedResults) | ||
} | ||
} | ||
|
||
@RunWith(classOf[JUnitRunner]) | ||
|
@@ -458,19 +396,6 @@ class NamespaceSpecificThrottleTests | |
}, 2, Some(1.second)) | ||
} | ||
|
||
it should "respect overridden concurrent throttle of 0" in withAssetCleaner(zeroConcProps) { (wp, assetHelper) => | ||
implicit val props = wp | ||
val actionName = "zeroConcurrentAction" | ||
|
||
assetHelper.withCleaner(wsk.action, actionName) { (action, _) => | ||
action.create(actionName, defaultAction) | ||
} | ||
|
||
wsk.action.invoke(actionName, expectedExitCode = TooManyRequests.intValue).stderr should { | ||
include(prefix(tooManyConcurrentRequests(0, 0))) and include("allowed: 0") | ||
} | ||
} | ||
|
||
it should "not store an activation if disabled for this namespace" in withAssetCleaner(activationDisabled) { | ||
(wp, assetHelper) => | ||
implicit val props = wp | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -101,55 +101,6 @@ class ConcurrencyTests extends TestHelpers with WskTestHelpers with WskActorSyst | |
} | ||
} | ||
|
||
//This tests generates the same load against the same action as previous test, BUT with concurrency set to 1 | ||
it should "execute activations sequentially when concurrency = 1 " in withAssetCleaner(wskprops) { | ||
assume(Option(WhiskProperties.getProperty("whisk.action.concurrency", "False")).exists(_.toBoolean)) | ||
|
||
(wp, assetHelper) => | ||
val name = "TestNonConcurrentAction" | ||
assetHelper.withCleaner(wsk.action, name, confirmDelete = true) { | ||
val actionName = TestUtils.getTestActionFilename("concurrent.js") | ||
(action, _) => | ||
//disable log collection since concurrent activation requires specialized log processing | ||
// (at action runtime and using specialized LogStore) | ||
action.create(name, Some(actionName), logsize = Some(0.bytes), concurrency = Some(1)) | ||
} | ||
//warm the container (concurrent activations with no warmed container, will cause multiple containers to be used - so we force one to warm up) | ||
val run = wsk.action.invoke(name, Map("warm" -> 1.toJson), blocking = true) | ||
withActivation(wsk.activation, run) { response => | ||
val logs = response.logs.get | ||
withClue(logs) { logs.size shouldBe 0 } | ||
|
||
response.response.status shouldBe "success" | ||
response.response.result shouldBe Some(JsObject("warm" -> 1.toJson)) | ||
} | ||
|
||
//read configs to determine max concurrency support - currently based on single invoker and invokerUserMemory config | ||
val busyThreshold = | ||
(loadConfigOrThrow[ContainerPoolConfig](ConfigKeys.containerPool).userMemory / MemoryLimit.STD_MEMORY).toInt | ||
|
||
//run maximum allowed concurrent actions via Futures | ||
val requestCount = busyThreshold | ||
println(s"executing $requestCount activations") | ||
val runs = (1 to requestCount).map { _ => | ||
Future { | ||
//expect only 1 activation concurrently (within the 1 second timeout implemented in concurrent.js) | ||
wsk.action.invoke(name, Map("requestCount" -> 1.toJson), blocking = true) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This test case also assumes the sharding pool balancer is used. |
||
} | ||
} | ||
|
||
//none of the actions will complete till the requestCount is reached | ||
Await.result(Future.sequence(runs), 50.seconds).foreach { run => | ||
withActivation(wsk.activation, run) { response => | ||
val logs = response.logs.get | ||
withClue(logs) { logs.size shouldBe 0 } | ||
response.response.status shouldBe "success" | ||
//expect only 1 activation concurrently | ||
response.response.result shouldBe Some(JsObject("msg" -> s"Received 1 activations.".toJson)) | ||
} | ||
} | ||
} | ||
|
||
it should "allow concurrent activations to gracefully complete when one fails" in withAssetCleaner(wskprops) { | ||
assume(Option(WhiskProperties.getProperty("whisk.action.concurrency", "False")).exists(_.toBoolean)) | ||
(wp, assetHelper) => | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,112 @@ | ||
#!/usr/bin/env bash | ||
|
||
# | ||
# Licensed to the Apache Software Foundation (ASF) under one or more | ||
# contributor license agreements. See the NOTICE file distributed with | ||
# this work for additional information regarding copyright ownership. | ||
# The ASF licenses this file to You under the Apache License, Version 2.0 | ||
# (the "License"); you may not use this file except in compliance with | ||
# the License. You may obtain a copy of the License at | ||
# | ||
# http://www.apache.org/licenses/LICENSE-2.0 | ||
# | ||
# Unless required by applicable law or agreed to in writing, software | ||
# distributed under the License is distributed on an "AS IS" BASIS, | ||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
# See the License for the specific language governing permissions and | ||
# limitations under the License. | ||
# | ||
|
||
set -e | ||
|
||
cat > ${GITHUB_WORKSPACE}/common/scala/src/main/resources/reference.conf << EOL | ||
# | ||
# Licensed to the Apache Software Foundation (ASF) under one or more | ||
# contributor license agreements. See the NOTICE file distributed with | ||
# this work for additional information regarding copyright ownership. | ||
# The ASF licenses this file to You under the Apache License, Version 2.0 | ||
# (the "License"); you may not use this file except in compliance with | ||
# the License. You may obtain a copy of the License at | ||
# | ||
# http://www.apache.org/licenses/LICENSE-2.0 | ||
# | ||
# Unless required by applicable law or agreed to in writing, software | ||
# distributed under the License is distributed on an "AS IS" BASIS, | ||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
# See the License for the specific language governing permissions and | ||
# limitations under the License. | ||
# | ||
|
||
include "s3-reference.conf" | ||
|
||
whisk.spi { | ||
ArtifactStoreProvider = org.apache.openwhisk.core.database.CouchDbStoreProvider | ||
ActivationStoreProvider = org.apache.openwhisk.core.database.ArtifactActivationStoreProvider | ||
MessagingProvider = org.apache.openwhisk.connector.kafka.KafkaMessagingProvider | ||
ContainerFactoryProvider = org.apache.openwhisk.core.containerpool.docker.DockerContainerFactoryProvider | ||
LogStoreProvider = org.apache.openwhisk.core.containerpool.logging.DockerToActivationLogStoreProvider | ||
LoadBalancerProvider = org.apache.openwhisk.core.loadBalancer.ShardingContainerPoolBalancer | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This script is to enable the ShardingPoolBalancer for the standalone tests. |
||
EntitlementSpiProvider = org.apache.openwhisk.core.entitlement.LocalEntitlementProvider | ||
AuthenticationDirectiveProvider = org.apache.openwhisk.core.controller.BasicAuthenticationDirective | ||
InvokerProvider = org.apache.openwhisk.core.invoker.InvokerReactive | ||
InvokerServerProvider = org.apache.openwhisk.core.invoker.DefaultInvokerServer | ||
DurationCheckerProvider = org.apache.openwhisk.core.scheduler.queue.NoopDurationCheckerProvider | ||
} | ||
|
||
dispatchers { | ||
# Custom dispatcher for CouchDB Client. Tune as needed. | ||
couch-dispatcher { | ||
type = Dispatcher | ||
executor = "thread-pool-executor" | ||
|
||
# Underlying thread pool implementation is java.util.concurrent.ThreadPoolExecutor | ||
thread-pool-executor { | ||
# Min number of threads to cap factor-based corePoolSize number to | ||
core-pool-size-min = 2 | ||
|
||
# The core-pool-size-factor is used to determine corePoolSize of the | ||
# ThreadPoolExecutor using the following formula: | ||
# ceil(available processors * factor). | ||
# Resulting size is then bounded by the core-pool-size-min and | ||
# core-pool-size-max values. | ||
core-pool-size-factor = 2.0 | ||
|
||
# Max number of threads to cap factor-based corePoolSize number to | ||
core-pool-size-max = 32 | ||
} | ||
# Throughput defines the number of messages that are processed in a batch | ||
# before the thread is returned to the pool. Set to 1 for as fair as possible. | ||
throughput = 5 | ||
} | ||
|
||
# Custom dispatcher for Kafka client. Tune as needed. | ||
kafka-dispatcher { | ||
type = Dispatcher | ||
executor = "thread-pool-executor" | ||
|
||
# Underlying thread pool implementation is java.util.concurrent.ThreadPoolExecutor | ||
thread-pool-executor { | ||
# Min number of threads to cap factor-based corePoolSize number to | ||
core-pool-size-min = 2 | ||
|
||
# The core-pool-size-factor is used to determine corePoolSize of the | ||
# ThreadPoolExecutor using the following formula: | ||
# ceil(available processors * factor). | ||
# Resulting size is then bounded by the core-pool-size-min and | ||
# core-pool-size-max values. | ||
core-pool-size-factor = 2.0 | ||
|
||
# Max number of threads to cap factor-based corePoolSize number to | ||
core-pool-size-max = 32 | ||
} | ||
|
||
# Throughput defines the number of messages that are processed in a batch | ||
# before the thread is returned to the pool. Set to 1 for as fair as possible. | ||
throughput = 5 | ||
} | ||
lease-service-dispatcher { | ||
type = PinnedDispatcher | ||
executor = "thread-pool-executor" | ||
} | ||
} | ||
EOL |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The akka client is not well aligned with the FunctionPollingContainerProxy.
So I changed this to the traditional apache HTTP client.