Skip to content

Commit c2ea992

Browse files
committed
Implement PFCInvokerServer
1 parent cd6fded commit c2ea992

File tree

10 files changed

+442
-7
lines changed

10 files changed

+442
-7
lines changed

ansible/group_vars/all

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -219,6 +219,8 @@ invoker:
219219
keystore:
220220
password: "{{ invoker_keystore_password | default('openwhisk') }}"
221221
name: "{{ __invoker_ssl_keyPrefix }}openwhisk-keystore.p12"
222+
reactiveSpi: "{{ invokerReactive_spi | default('') }}"
223+
serverSpi: "{{ invokerServer_spi | default('') }}"
222224

223225
userLogs:
224226
spi: "{{ userLogs_spi | default('org.apache.openwhisk.core.containerpool.logging.DockerToActivationLogStoreProvider') }}"

ansible/roles/invoker/tasks/deploy.yml

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -259,6 +259,8 @@
259259
"CONFIG_kamon_statsd_hostname": "{{ metrics.kamon.host }}"
260260
"CONFIG_kamon_statsd_port": "{{ metrics.kamon.port }}"
261261
"CONFIG_whisk_spi_LogStoreProvider": "{{ userLogs.spi }}"
262+
"CONFIG_whisk_spi_InvokerProvider": "{{ invoker.reactiveSpi }}"
263+
"CONFIG_whisk_spi_InvokerServerProvider": "{{ invoker.serverSpi }}"
262264
"CONFIG_logback_log_level": "{{ invoker.loglevel }}"
263265
"CONFIG_whisk_memory_min": "{{ limit_action_memory_min | default() }}"
264266
"CONFIG_whisk_memory_max": "{{ limit_action_memory_max | default() }}"

core/invoker/src/main/scala/org/apache/openwhisk/core/containerpool/v2/InvokerHealthManager.scala

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -153,6 +153,7 @@ class InvokerHealthManager(instanceId: InvokerInstanceId,
153153
data.memory.inProgressMemory,
154154
instanceId.tags,
155155
instanceId.dedicatedNamespaces)
156+
InvokerHealthManager.useMemory = invokerResourceMessage.busyMemory + invokerResourceMessage.inProgressMemory
156157
dataManagementService ! UpdateDataOnChange(InvokerKeys.health(instanceId), invokerResourceMessage.serialize)
157158
stay using data.copy(currentInvokerResource = Some(invokerResourceMessage))
158159

@@ -269,6 +270,7 @@ object InvokerHealthManager {
269270
val healthActionNamePrefix = "invokerHealthTestAction"
270271
val bufferSize = 10
271272
val bufferErrorTolerance = 3
273+
var useMemory = 0l
272274
val healthActionIdentity: Identity = {
273275
val whiskSystem = "whisk.system"
274276
val uuid = UUID()
Lines changed: 65 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,65 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.openwhisk.core.invoker
19+
20+
import akka.actor.ActorSystem
21+
import akka.http.scaladsl.model.StatusCodes
22+
import akka.http.scaladsl.model.headers.BasicHttpCredentials
23+
import akka.http.scaladsl.server.Route
24+
import org.apache.openwhisk.common.{Logging, TransactionId}
25+
import org.apache.openwhisk.http.BasicRasService
26+
import org.apache.openwhisk.http.ErrorResponse.terminate
27+
import spray.json.PrettyPrinter
28+
29+
import scala.concurrent.ExecutionContext
30+
31+
/**
32+
* Implements web server to handle certain REST API calls.
33+
*/
34+
class DefaultInvokerServer(val invoker: InvokerCore, systemUsername: String, systemPassword: String)(
35+
implicit val ec: ExecutionContext,
36+
val actorSystem: ActorSystem,
37+
val logger: Logging)
38+
extends BasicRasService {
39+
40+
/** Pretty print JSON response. */
41+
implicit val jsonPrettyResponsePrinter = PrettyPrinter
42+
43+
override def routes(implicit transid: TransactionId): Route = {
44+
super.routes ~ extractCredentials {
45+
case Some(BasicHttpCredentials(username, password)) if username == systemUsername && password == systemPassword =>
46+
(path("enable") & post) {
47+
invoker.enable()
48+
} ~ (path("disable") & post) {
49+
invoker.disable()
50+
}
51+
case _ => terminate(StatusCodes.Unauthorized)
52+
}
53+
}
54+
}
55+
56+
object DefaultInvokerServer extends InvokerServerProvider {
57+
58+
// TODO: TBD, after FPCInvokerReactive is ready, can read the credentials from pureconfig
59+
val invokerUsername = "admin"
60+
val invokerPassword = "admin"
61+
62+
override def instance(
63+
invoker: InvokerCore)(implicit ec: ExecutionContext, actorSystem: ActorSystem, logger: Logging): BasicRasService =
64+
new DefaultInvokerServer(invoker, invokerUsername, invokerPassword)
65+
}
Lines changed: 68 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,68 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.openwhisk.core.invoker
19+
20+
import akka.actor.ActorSystem
21+
import akka.http.scaladsl.model.StatusCodes
22+
import akka.http.scaladsl.model.headers.BasicHttpCredentials
23+
import akka.http.scaladsl.server.Route
24+
import org.apache.openwhisk.common.{Logging, TransactionId}
25+
import org.apache.openwhisk.core.containerpool.v2.InvokerHealthManager
26+
import org.apache.openwhisk.http.BasicRasService
27+
import org.apache.openwhisk.http.ErrorResponse.terminate
28+
import spray.json.PrettyPrinter
29+
30+
import scala.concurrent.ExecutionContext
31+
32+
/**
33+
* Implements web server to handle certain REST API calls.
34+
*/
35+
class FPCInvokerServer(val invoker: InvokerCore, systemUsername: String, systemPassword: String)(
36+
implicit val ec: ExecutionContext,
37+
val actorSystem: ActorSystem,
38+
val logger: Logging)
39+
extends BasicRasService {
40+
41+
/** Pretty print JSON response. */
42+
implicit val jsonPrettyResponsePrinter = PrettyPrinter
43+
44+
override def routes(implicit transid: TransactionId): Route = {
45+
super.routes ~ extractCredentials {
46+
case Some(BasicHttpCredentials(username, password)) if username == systemUsername && password == systemPassword =>
47+
(path("enable") & post) {
48+
invoker.enable()
49+
} ~ (path("disable") & post) {
50+
invoker.disable()
51+
} ~ (path("memory") & get) {
52+
complete(InvokerHealthManager.useMemory.toString)
53+
}
54+
case _ => terminate(StatusCodes.Unauthorized)
55+
}
56+
}
57+
}
58+
59+
object FPCInvokerServer extends InvokerServerProvider {
60+
61+
// TODO: TBD, after FPCInvokerReactive is ready, can read the credentials from pureconfig
62+
val invokerUsername = "admin"
63+
val invokerPassword = "admin"
64+
65+
override def instance(
66+
invoker: InvokerCore)(implicit ec: ExecutionContext, actorSystem: ActorSystem, logger: Logging): BasicRasService =
67+
new FPCInvokerServer(invoker, invokerUsername, invokerPassword)
68+
}

core/invoker/src/main/scala/org/apache/openwhisk/core/invoker/Invoker.scala

Lines changed: 5 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@ package org.apache.openwhisk.core.invoker
1919

2020
import akka.Done
2121
import akka.actor.{ActorSystem, CoordinatedShutdown}
22+
import akka.http.scaladsl.server.Route
2223
import akka.stream.ActorMaterializer
2324
import com.typesafe.config.ConfigValueFactory
2425
import kamon.Kamon
@@ -217,7 +218,10 @@ trait InvokerProvider extends Spi {
217218
}
218219

219220
// this trait can be used to add common implementation
220-
trait InvokerCore {}
221+
trait InvokerCore {
222+
def enable(): Route
223+
def disable(): Route
224+
}
221225

222226
/**
223227
* An Spi for providing RestAPI implementation for invoker.
@@ -227,9 +231,3 @@ trait InvokerServerProvider extends Spi {
227231
def instance(
228232
invoker: InvokerCore)(implicit ec: ExecutionContext, actorSystem: ActorSystem, logger: Logging): BasicRasService
229233
}
230-
231-
object DefaultInvokerServer extends InvokerServerProvider {
232-
override def instance(
233-
invoker: InvokerCore)(implicit ec: ExecutionContext, actorSystem: ActorSystem, logger: Logging): BasicRasService =
234-
new BasicRasService {}
235-
}

core/invoker/src/main/scala/org/apache/openwhisk/core/invoker/InvokerReactive.scala

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@ import java.time.Instant
2323
import akka.Done
2424
import akka.actor.{ActorRefFactory, ActorSystem, CoordinatedShutdown, Props}
2525
import akka.event.Logging.InfoLevel
26+
import akka.http.scaladsl.server.Route
2627
import akka.stream.ActorMaterializer
2728
import org.apache.openwhisk.common._
2829
import org.apache.openwhisk.common.tracing.WhiskTracerProvider
@@ -299,4 +300,8 @@ class InvokerReactive(
299300
}
300301
})
301302

303+
override def enable(): Route = ???
304+
305+
override def disable(): Route = ???
306+
302307
}

tests/src/test/scala/common/WhiskProperties.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -270,6 +270,10 @@ public static String getBaseControllerAddress() {
270270
return getBaseControllerHost() + ":" + getControllerBasePort();
271271
}
272272

273+
public static String getBaseInvokerAddress(){
274+
return getInvokerHosts()[0] + ":" + whiskProperties.getProperty("invoker.hosts.basePort");
275+
}
276+
273277
public static int getMaxActionInvokesPerMinute() {
274278
String valStr = whiskProperties.getProperty("limits.actions.invokes.perMinute");
275279
return Integer.parseInt(valStr);
Lines changed: 145 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,145 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.openwhisk.core.invoker.test
19+
20+
import akka.http.scaladsl.model.StatusCodes.{OK, Unauthorized}
21+
import akka.http.scaladsl.model.headers.BasicHttpCredentials
22+
import akka.http.scaladsl.server.Route
23+
import akka.http.scaladsl.testkit.ScalatestRouteTest
24+
import common.StreamLogging
25+
import org.apache.openwhisk.common.TransactionId
26+
import org.apache.openwhisk.core.invoker.{DefaultInvokerServer, InvokerCore}
27+
import org.apache.openwhisk.http.BasicHttpService
28+
import org.junit.runner.RunWith
29+
import org.scalamock.scalatest.MockFactory
30+
import org.scalatest.{BeforeAndAfterAll, BeforeAndAfterEach, FlatSpec, Matchers}
31+
import org.scalatest.junit.JUnitRunner
32+
33+
/**
34+
* Tests InvokerServer API.
35+
*/
36+
@RunWith(classOf[JUnitRunner])
37+
class DefaultInvokerServerTests
38+
extends FlatSpec
39+
with BeforeAndAfterEach
40+
with BeforeAndAfterAll
41+
with ScalatestRouteTest
42+
with Matchers
43+
with StreamLogging
44+
with MockFactory {
45+
46+
def transid() = TransactionId("tid")
47+
48+
val systemUsername = "username"
49+
val systemPassword = "password"
50+
51+
val reactive = new TestInvokerReactive
52+
val server = new DefaultInvokerServer(reactive, systemUsername, systemPassword)
53+
54+
override protected def afterEach(): Unit = reactive.reset()
55+
56+
/** DefaultInvokerServer API tests */
57+
behavior of "DefaultInvokerServer API"
58+
59+
it should "enable invoker" in {
60+
implicit val tid = transid()
61+
val validCredentials = BasicHttpCredentials(systemUsername, systemPassword)
62+
Post(s"/enable") ~> addCredentials(validCredentials) ~> Route.seal(server.routes(tid)) ~> check {
63+
status should be(OK)
64+
reactive.enableCount shouldBe 1
65+
reactive.disableCount shouldBe 0
66+
}
67+
}
68+
69+
it should "disable invoker" in {
70+
implicit val tid = transid()
71+
val validCredentials = BasicHttpCredentials(systemUsername, systemPassword)
72+
Post(s"/disable") ~> addCredentials(validCredentials) ~> Route.seal(server.routes(tid)) ~> check {
73+
status should be(OK)
74+
reactive.enableCount shouldBe 0
75+
reactive.disableCount shouldBe 1
76+
}
77+
}
78+
79+
it should "not enable invoker with invalid credential" in {
80+
implicit val tid = transid()
81+
val invalidCredentials = BasicHttpCredentials("invaliduser", "invalidpass")
82+
Post(s"/enable") ~> addCredentials(invalidCredentials) ~> Route.seal(server.routes(tid)) ~> check {
83+
status should be(Unauthorized)
84+
reactive.enableCount shouldBe 0
85+
reactive.disableCount shouldBe 0
86+
}
87+
}
88+
89+
it should "not disable invoker with invalid credential" in {
90+
implicit val tid = transid()
91+
val invalidCredentials = BasicHttpCredentials("invaliduser", "invalidpass")
92+
Post(s"/disable") ~> addCredentials(invalidCredentials) ~> Route.seal(server.routes(tid)) ~> check {
93+
status should be(Unauthorized)
94+
reactive.enableCount shouldBe 0
95+
reactive.disableCount shouldBe 0
96+
}
97+
}
98+
99+
it should "not enable invoker with empty credential" in {
100+
implicit val tid = transid()
101+
Post(s"/enable") ~> Route.seal(server.routes(tid)) ~> check {
102+
status should be(Unauthorized)
103+
reactive.enableCount shouldBe 0
104+
reactive.disableCount shouldBe 0
105+
}
106+
}
107+
108+
it should "not disable invoker with empty credential" in {
109+
implicit val tid = transid()
110+
Post(s"/disable") ~> Route.seal(server.routes(tid)) ~> check {
111+
status should be(Unauthorized)
112+
reactive.enableCount shouldBe 0
113+
reactive.disableCount shouldBe 0
114+
}
115+
}
116+
117+
}
118+
119+
class TestInvokerReactive extends InvokerCore with BasicHttpService {
120+
var enableCount = 0
121+
var disableCount = 0
122+
123+
override def enable(): Route = {
124+
enableCount += 1
125+
complete("")
126+
}
127+
128+
override def disable(): Route = {
129+
disableCount += 1
130+
complete("")
131+
}
132+
133+
def reset(): Unit = {
134+
enableCount = 0
135+
disableCount = 0
136+
}
137+
138+
/**
139+
* Gets the routes implemented by the HTTP service.
140+
*
141+
* @param transid the id for the transaction (every request is assigned an id)
142+
*/
143+
override def routes(implicit transid: TransactionId): Route = ???
144+
145+
}

0 commit comments

Comments
 (0)