Skip to content

Enable the scheduler by default #5463

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 13 commits into from
Mar 6, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .github/workflows/2-system.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ on:
env:
# openwhisk env
TEST_SUITE: System
ANSIBLE_CMD: "ansible-playbook -i environments/local -e docker_image_prefix=testing"
ANSIBLE_CMD: "ansible-playbook -i environments/local -e docker_image_prefix=testing -e container_pool_akka_client=false"
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The akka client is not well aligned with the FunctionPollingContainerProxy.
So I changed this to the traditional apache HTTP client.

GRADLE_PROJS_SKIP: ""

## secrets
Expand Down
2 changes: 2 additions & 0 deletions .github/workflows/4-standalone.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,8 @@ jobs:
sudo rm -rf "$AGENT_TOOLSDIRECTORY"
- name: Check free space
run: df -h
- name: Disable the scheduler
run: "./tools/github/disable-scheduler.sh"
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The standalone mode is only supported with ShardingPoolBalancer, so we need to disable the scheduler for standalone tests.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No objections to the change, but do you think the standalone mode could be supported with the new scheduler or should we be looking to deprecate standalone mode? Long term, I'd think we would want to consolidate down to a single scheduler (the new one) to simplify maintenance.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, that would be great to discuss and we can handle it in another issue if necessary.
I just wanted not to postpone the release of the new version of OpenWhisk Core due to this.

I am still unclear if we can support the standalone mode with the scheduler or even if it would be efficient to enable the scheduler in the standalone mode.
The scheduler works as a queue for actions and serves activations according to requests from containers(invokers).
Currently, a controller works as a controller and an invoker at the same time in the standalone mode.
If we put the scheduler into it, it would balance loads, provision containers, buffer activations, and send them to containers. We can make it work but I am not sure making one component that is in charge of all of these roles is aligned well with the goal of the standalone mode. I expect the standalone mode would be used in an environment with limited resources like an IoT machine. I have a concern that it might be too complex and require too many resources in such a circumstance.

- id: tests
name: Run Tests
run: "./tools/github/run${{ env.TEST_SUITE }}Tests.sh"
Expand Down
7 changes: 4 additions & 3 deletions ansible/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -195,12 +195,13 @@ Set the value of pause-grace to 10s by default
.
```

#### Enable the scheduler
- Make sure you enable the scheduler by configuring `scheduler_enable`.
#### Disable the scheduler
- You can disable the scheduler by configuring `scheduler_enable`.
- The scheduler is enabled by default.

**ansible/environments/local/group_vars/all**
```yaml
scheduler_enable: true
scheduler_enable: false
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Now, the scheduler is enabled by default.

```

#### [Optional] Enable ElasticSearch Activation Store
Expand Down
2 changes: 1 addition & 1 deletion ansible/group_vars/all
Original file line number Diff line number Diff line change
Expand Up @@ -498,7 +498,7 @@ durationChecker:
spi: "{{ duration_checker_spi | default('') }}"
timeWindow: "{{ duration_checker_time_window | default('1 d') }}"

enable_scheduler: "{{ scheduler_enable | default(false) }}"
enable_scheduler: "{{ scheduler_enable | default(true) }}"

scheduler:
protocol: "{{ scheduler_protocol | default('http') }}"
Expand Down
10 changes: 5 additions & 5 deletions common/scala/src/main/resources/reference.conf
Original file line number Diff line number Diff line change
Expand Up @@ -23,12 +23,12 @@ whisk.spi {
MessagingProvider = org.apache.openwhisk.connector.kafka.KafkaMessagingProvider
ContainerFactoryProvider = org.apache.openwhisk.core.containerpool.docker.DockerContainerFactoryProvider
LogStoreProvider = org.apache.openwhisk.core.containerpool.logging.DockerToActivationLogStoreProvider
LoadBalancerProvider = org.apache.openwhisk.core.loadBalancer.ShardingContainerPoolBalancer
EntitlementSpiProvider = org.apache.openwhisk.core.entitlement.LocalEntitlementProvider
LoadBalancerProvider = org.apache.openwhisk.core.loadBalancer.FPCPoolBalancer
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Scheduler-related SPI providers are used by default.

EntitlementSpiProvider = org.apache.openwhisk.core.entitlement.FPCEntitlementProvider
AuthenticationDirectiveProvider = org.apache.openwhisk.core.controller.BasicAuthenticationDirective
InvokerProvider = org.apache.openwhisk.core.invoker.InvokerReactive
InvokerServerProvider = org.apache.openwhisk.core.invoker.DefaultInvokerServer
DurationCheckerProvider = org.apache.openwhisk.core.scheduler.queue.NoopDurationCheckerProvider
InvokerProvider = org.apache.openwhisk.core.invoker.FPCInvokerReactive
InvokerServerProvider = org.apache.openwhisk.core.invoker.FPCInvokerServer
DurationCheckerProvider = org.apache.openwhisk.core.scheduler.queue.ElasticSearchDurationCheckerProvider
}

dispatchers {
Expand Down
2 changes: 1 addition & 1 deletion core/controller/src/main/resources/reference.conf
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ whisk {
timeout-addon = 1m

fpc {
use-per-min-throttles = false
use-per-min-throttles = true
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is required to pass per-minute throttling tests.

}
}
controller {
Expand Down
5 changes: 3 additions & 2 deletions tests/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@ def systemIncludes = [
"org/apache/openwhisk/core/apigw/actions/test/**",
"org/apache/openwhisk/core/database/test/*CacheConcurrencyTests*",
"org/apache/openwhisk/core/controller/test/*ControllerApiTests*",
"org/apache/openwhisk/core/scheduler/queue/test/ElasticSearchDurationCheck*",
"apigw/healthtests/**",
"ha/**",
"services/**",
Expand All @@ -78,7 +79,7 @@ ext.testSets = [
"org/apache/openwhisk/core/limits/**",
"org/apache/openwhisk/core/scheduler/**",
"org/apache/openwhisk/core/invoker/test/*InvokerBootUpTests*",
"org/apache/openwhisk/core/loadBalancer/test/*FPCPoolBalancerTests*",
"org/apache/openwhisk/core/scheduler/queue/test/ElasticSearchDurationCheck*",
"org/apache/openwhisk/common/etcd/**",
"**/*CacheConcurrencyTests*",
"**/*ControllerApiTests*",
Expand All @@ -89,6 +90,7 @@ ext.testSets = [
"REQUIRE_SYSTEM" : [
"includes" : systemIncludes,
"excludes": [
"org/apache/openwhisk/core/loadBalancer/test/*FPCPoolBalancerTests*",
"system/basic/WskMultiRuntimeTests*",
'invokerShoot/**'
]
Expand All @@ -99,7 +101,6 @@ ext.testSets = [
"org/apache/openwhisk/core/containerpool/v2/test/**",
"org/apache/openwhisk/core/scheduler/**",
"org/apache/openwhisk/core/invoker/test/*InvokerBootUpTests*",
"org/apache/openwhisk/core/loadBalancer/test/*FPCPoolBalancerTests*",
"org/apache/openwhisk/core/service/**",
]
],
Expand Down
75 changes: 0 additions & 75 deletions tests/src/test/scala/limits/ThrottleTests.scala
Original file line number Diff line number Diff line change
Expand Up @@ -229,68 +229,6 @@ class ThrottleTests
settleThrottles(alreadyWaited)
}
}

it should "throttle 'concurrent' activations of one action" in withAssetCleaner(wskprops) { (wp, assetHelper) =>
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This test assumes the ShardingPoolBalancer is used.
The concurrency in the FPCscheduler differs from the ShardingPoolBalancer's one.

The sharding pool balancer counts the number of activations on the fly and throttles requests based on the concurrent invocation limit.
On the other hand, in the new scheduler, the concurrent limit implies the number of concurrent containers for a given action. Throttling works based on the processing power of existing containers.
When the maximum number of containers are being used and another action is invoked for the first time, it can't create more containers, it will reject the request with 429 too many requests.

Please refer to this comment: https://github.com/apache/openwhisk/blob/master/core/controller/src/main/scala/org/apache/openwhisk/core/loadBalancer/FPCPoolBalancer.scala#L683

In this regard, I removed this test case.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@style95 is there an alternate test that validates the behavior described int the comments?

Copy link
Member Author

@style95 style95 Feb 26, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's not the same, but there are two related tests.
The load balancer gets throttling flags from ETCD and rejects/accepts requests according to it.
So this test covers the behavior of the load balancer that decides based on the throttling flags.
https://github.com/apache/openwhisk/blob/master/tests/src/test/scala/org/apache/openwhisk/core/controller/test/FPCEntitlementTests.scala

And MemoryQueue populates these throttling flags with its state transition.
So this test covers if the memory queue sets the throttling key when it changes its state to a throttled state.
https://github.com/apache/openwhisk/blob/master/tests/src/test/scala/org/apache/openwhisk/core/scheduler/queue/test/MemoryQueueFlowTests.scala#L363

There are also some other tests related to throttling like

val name = "checkConcurrentActionThrottle"
assetHelper.withCleaner(wsk.action, name) {
val timeoutAction = Some(TestUtils.getTestActionFilename("sleep.js"))
(action, _) =>
action.create(name, timeoutAction)
}

// The sleep is necessary as the load balancer currently has a latency before recognizing concurrency.
val sleep = 15.seconds
// Adding a bit of overcommit since some loadbalancers rely on some overcommit. This won't hurt those who don't
// since all activations are taken into account to check for throttled invokes below.
val slowInvokes = (maximumConcurrentInvokes * 1.2).toInt
val fastInvokes = 4
val fastInvokeDuration = 4.seconds
val slowInvokeDuration = sleep + fastInvokeDuration

// These invokes will stay active long enough that all are issued and load balancer has recognized concurrency.
val startSlowInvokes = Instant.now
val slowResults = untilThrottled(slowInvokes) { () =>
wsk.action.invoke(
name,
Map("sleepTimeInMs" -> slowInvokeDuration.toMillis.toJson),
expectedExitCode = DONTCARE_EXIT)
}
val afterSlowInvokes = Instant.now
val slowIssueDuration = durationBetween(startSlowInvokes, afterSlowInvokes)
println(
s"$slowInvokes slow invokes (dur = ${slowInvokeDuration.toSeconds} sec) took ${slowIssueDuration.toSeconds} seconds to issue")

// Sleep to let the background thread get the newest values (refreshes every 2 seconds)
println(s"Sleeping for ${sleep.toSeconds} sec")
Thread.sleep(sleep.toMillis)

// These fast invokes will trigger the concurrency-based throttling.
val startFastInvokes = Instant.now
val fastResults = untilThrottled(fastInvokes) { () =>
wsk.action.invoke(
name,
Map("sleepTimeInMs" -> fastInvokeDuration.toMillis.toJson),
expectedExitCode = DONTCARE_EXIT)
}
val afterFastInvokes = Instant.now
val fastIssueDuration = durationBetween(afterFastInvokes, startFastInvokes)
println(
s"$fastInvokes fast invokes (dur = ${fastInvokeDuration.toSeconds} sec) took ${fastIssueDuration.toSeconds} seconds to issue")

val combinedResults = slowResults ++ fastResults
try {
val throttledCount = throttledActivations(combinedResults, tooManyConcurrentRequests(0, 0))
throttledCount should be > 0
} finally {
val alreadyWaited = durationBetween(afterSlowInvokes, Instant.now)
settleThrottles(alreadyWaited)
println("clearing activations")
}
// wait for the activations last, giving the activations time to complete and
// may avoid unnecessarily polling; if these fail, the throttle may not be settled
println("waiting for activations to complete")
waitForActivations(combinedResults)
}
}

@RunWith(classOf[JUnitRunner])
Expand Down Expand Up @@ -458,19 +396,6 @@ class NamespaceSpecificThrottleTests
}, 2, Some(1.second))
}

it should "respect overridden concurrent throttle of 0" in withAssetCleaner(zeroConcProps) { (wp, assetHelper) =>
implicit val props = wp
val actionName = "zeroConcurrentAction"

assetHelper.withCleaner(wsk.action, actionName) { (action, _) =>
action.create(actionName, defaultAction)
}

wsk.action.invoke(actionName, expectedExitCode = TooManyRequests.intValue).stderr should {
include(prefix(tooManyConcurrentRequests(0, 0))) and include("allowed: 0")
}
}

it should "not store an activation if disabled for this namespace" in withAssetCleaner(activationDisabled) {
(wp, assetHelper) =>
implicit val props = wp
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ class FPCEntitlementProviderTests extends ControllerTestCommon with ScalaFutures
val someUser = WhiskAuthHelpers.newIdentity()
val action = FullyQualifiedEntityName(EntityPath("testns"), EntityName("action"))
val loadBalancer = mock[LoadBalancer]
(loadBalancer.clusterSize _).expects().returning(1).anyNumberOfTimes()
(loadBalancer
.checkThrottle(_: EntityPath, _: String))
.expects(someUser.namespace.name.toPath, action.fullPath.asString)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -101,55 +101,6 @@ class ConcurrencyTests extends TestHelpers with WskTestHelpers with WskActorSyst
}
}

//This tests generates the same load against the same action as previous test, BUT with concurrency set to 1
it should "execute activations sequentially when concurrency = 1 " in withAssetCleaner(wskprops) {
assume(Option(WhiskProperties.getProperty("whisk.action.concurrency", "False")).exists(_.toBoolean))

(wp, assetHelper) =>
val name = "TestNonConcurrentAction"
assetHelper.withCleaner(wsk.action, name, confirmDelete = true) {
val actionName = TestUtils.getTestActionFilename("concurrent.js")
(action, _) =>
//disable log collection since concurrent activation requires specialized log processing
// (at action runtime and using specialized LogStore)
action.create(name, Some(actionName), logsize = Some(0.bytes), concurrency = Some(1))
}
//warm the container (concurrent activations with no warmed container, will cause multiple containers to be used - so we force one to warm up)
val run = wsk.action.invoke(name, Map("warm" -> 1.toJson), blocking = true)
withActivation(wsk.activation, run) { response =>
val logs = response.logs.get
withClue(logs) { logs.size shouldBe 0 }

response.response.status shouldBe "success"
response.response.result shouldBe Some(JsObject("warm" -> 1.toJson))
}

//read configs to determine max concurrency support - currently based on single invoker and invokerUserMemory config
val busyThreshold =
(loadConfigOrThrow[ContainerPoolConfig](ConfigKeys.containerPool).userMemory / MemoryLimit.STD_MEMORY).toInt

//run maximum allowed concurrent actions via Futures
val requestCount = busyThreshold
println(s"executing $requestCount activations")
val runs = (1 to requestCount).map { _ =>
Future {
//expect only 1 activation concurrently (within the 1 second timeout implemented in concurrent.js)
wsk.action.invoke(name, Map("requestCount" -> 1.toJson), blocking = true)
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This test case also assumes the sharding pool balancer is used.
It expects multiple containers to be created and each of them receives only 1 request each.
But with the new scheduler, one container could receive multiple activations.

}
}

//none of the actions will complete till the requestCount is reached
Await.result(Future.sequence(runs), 50.seconds).foreach { run =>
withActivation(wsk.activation, run) { response =>
val logs = response.logs.get
withClue(logs) { logs.size shouldBe 0 }
response.response.status shouldBe "success"
//expect only 1 activation concurrently
response.response.result shouldBe Some(JsObject("msg" -> s"Received 1 activations.".toJson))
}
}
}

it should "allow concurrent activations to gracefully complete when one fails" in withAssetCleaner(wskprops) {
assume(Option(WhiskProperties.getProperty("whisk.action.concurrency", "False")).exists(_.toBoolean))
(wp, assetHelper) =>
Expand Down
112 changes: 112 additions & 0 deletions tools/github/disable-scheduler.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,112 @@
#!/usr/bin/env bash

#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#

set -e

cat > ${GITHUB_WORKSPACE}/common/scala/src/main/resources/reference.conf << EOL
#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#

include "s3-reference.conf"

whisk.spi {
ArtifactStoreProvider = org.apache.openwhisk.core.database.CouchDbStoreProvider
ActivationStoreProvider = org.apache.openwhisk.core.database.ArtifactActivationStoreProvider
MessagingProvider = org.apache.openwhisk.connector.kafka.KafkaMessagingProvider
ContainerFactoryProvider = org.apache.openwhisk.core.containerpool.docker.DockerContainerFactoryProvider
LogStoreProvider = org.apache.openwhisk.core.containerpool.logging.DockerToActivationLogStoreProvider
LoadBalancerProvider = org.apache.openwhisk.core.loadBalancer.ShardingContainerPoolBalancer
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This script is to enable the ShardingPoolBalancer for the standalone tests.

EntitlementSpiProvider = org.apache.openwhisk.core.entitlement.LocalEntitlementProvider
AuthenticationDirectiveProvider = org.apache.openwhisk.core.controller.BasicAuthenticationDirective
InvokerProvider = org.apache.openwhisk.core.invoker.InvokerReactive
InvokerServerProvider = org.apache.openwhisk.core.invoker.DefaultInvokerServer
DurationCheckerProvider = org.apache.openwhisk.core.scheduler.queue.NoopDurationCheckerProvider
}

dispatchers {
# Custom dispatcher for CouchDB Client. Tune as needed.
couch-dispatcher {
type = Dispatcher
executor = "thread-pool-executor"

# Underlying thread pool implementation is java.util.concurrent.ThreadPoolExecutor
thread-pool-executor {
# Min number of threads to cap factor-based corePoolSize number to
core-pool-size-min = 2

# The core-pool-size-factor is used to determine corePoolSize of the
# ThreadPoolExecutor using the following formula:
# ceil(available processors * factor).
# Resulting size is then bounded by the core-pool-size-min and
# core-pool-size-max values.
core-pool-size-factor = 2.0

# Max number of threads to cap factor-based corePoolSize number to
core-pool-size-max = 32
}
# Throughput defines the number of messages that are processed in a batch
# before the thread is returned to the pool. Set to 1 for as fair as possible.
throughput = 5
}

# Custom dispatcher for Kafka client. Tune as needed.
kafka-dispatcher {
type = Dispatcher
executor = "thread-pool-executor"

# Underlying thread pool implementation is java.util.concurrent.ThreadPoolExecutor
thread-pool-executor {
# Min number of threads to cap factor-based corePoolSize number to
core-pool-size-min = 2

# The core-pool-size-factor is used to determine corePoolSize of the
# ThreadPoolExecutor using the following formula:
# ceil(available processors * factor).
# Resulting size is then bounded by the core-pool-size-min and
# core-pool-size-max values.
core-pool-size-factor = 2.0

# Max number of threads to cap factor-based corePoolSize number to
core-pool-size-max = 32
}

# Throughput defines the number of messages that are processed in a batch
# before the thread is returned to the pool. Set to 1 for as fair as possible.
throughput = 5
}
lease-service-dispatcher {
type = PinnedDispatcher
executor = "thread-pool-executor"
}
}
EOL