Skip to content

Commit faa7bab

Browse files
committed
Implement PFCInvokerServer
1 parent 59b67fe commit faa7bab

File tree

11 files changed

+522
-13
lines changed

11 files changed

+522
-13
lines changed

ansible/group_vars/all

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -453,5 +453,5 @@ etcd_connect_string: "{% set ret = [] %}\
453453

454454
scheduler:
455455
dataManagementService:
456-
retryInterval: "{{ scheduler_dataManagementService_retryInterval | default(1 second) }}"
456+
retryInterval: "{{ scheduler_dataManagementService_retryInterval | default('1 second') }}"
457457

core/controller/src/main/scala/org/apache/openwhisk/core/loadBalancer/InvokerSupervision.scala

Lines changed: 20 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -175,8 +175,26 @@ class InvokerPool(childFactory: (ActorRefFactory, InvokerInstanceId) => ActorRef
175175
val raw = new String(bytes, StandardCharsets.UTF_8)
176176
PingMessage.parse(raw) match {
177177
case Success(p: PingMessage) =>
178-
self ! p
179-
invokerPingFeed ! MessageFeed.Processed
178+
if (p.instance.instance < 0) {
179+
// Set the invoker status to UnHealthy immediately. Because if controller receives the success result of
180+
// action execute continuously, it will judge that invoker as up status all the time(It will not judge
181+
// the invoker status by timeout mechanism)
182+
logging.info(this, s"get invoker unhealthy message: invoker${Math.abs(p.instance.toInt) - 1}")
183+
status = status.updated(
184+
Math.abs(p.instance.toInt) - 1,
185+
new InvokerHealth(
186+
InvokerInstanceId(
187+
Math.abs(p.instance.toInt) - 1,
188+
p.instance.uniqueName,
189+
p.instance.displayedName,
190+
p.instance.userMemory),
191+
Unhealthy))
192+
logStatus()
193+
invokerPingFeed ! MessageFeed.Processed
194+
} else {
195+
self ! p
196+
invokerPingFeed ! MessageFeed.Processed
197+
}
180198

181199
case Failure(t) =>
182200
invokerPingFeed ! MessageFeed.Processed

core/invoker/src/main/scala/org/apache/openwhisk/core/containerpool/v2/InvokerHealthManager.scala

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -153,6 +153,7 @@ class InvokerHealthManager(instanceId: InvokerInstanceId,
153153
data.memory.inProgressMemory,
154154
instanceId.tags,
155155
instanceId.dedicatedNamespaces)
156+
InvokerHealthManager.useMemory = invokerResourceMessage.busyMemory + invokerResourceMessage.inProgressMemory
156157
dataManagementService ! UpdateDataOnChange(InvokerKeys.health(instanceId), invokerResourceMessage.serialize)
157158
stay using data.copy(currentInvokerResource = Some(invokerResourceMessage))
158159

@@ -269,6 +270,7 @@ object InvokerHealthManager {
269270
val healthActionNamePrefix = "invokerHealthTestAction"
270271
val bufferSize = 10
271272
val bufferErrorTolerance = 3
273+
var useMemory = 0l
272274
val healthActionIdentity: Identity = {
273275
val whiskSystem = "whisk.system"
274276
val uuid = UUID()
Lines changed: 50 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,50 @@
1+
package org.apache.openwhisk.core.invoker
2+
3+
import akka.actor.ActorSystem
4+
import akka.http.scaladsl.model.StatusCodes
5+
import akka.http.scaladsl.model.headers.BasicHttpCredentials
6+
import akka.http.scaladsl.server.Route
7+
import org.apache.openwhisk.common.{Logging, TransactionId}
8+
import org.apache.openwhisk.http.BasicRasService
9+
import org.apache.openwhisk.http.ErrorResponse.terminate
10+
import spray.json.PrettyPrinter
11+
12+
import scala.concurrent.ExecutionContext
13+
14+
/**
15+
* Implements web server to handle certain REST API calls.
16+
*/
17+
class DefaultInvokerServer(val invoker: InvokerCore, systemUsername: String, systemPassword: String)(
18+
implicit val ec: ExecutionContext,
19+
val actorSystem: ActorSystem,
20+
val logger: Logging)
21+
extends BasicRasService {
22+
23+
/** Pretty print JSON response. */
24+
implicit val jsonPrettyResponsePrinter = PrettyPrinter
25+
26+
override def routes(implicit transid: TransactionId): Route = {
27+
super.routes ~ extractCredentials {
28+
case Some(BasicHttpCredentials(username, password)) if username == systemUsername && password == systemPassword =>
29+
(path("enable") & post) {
30+
invoker.enable()
31+
} ~ (path("disable") & post) {
32+
invoker.disable()
33+
}
34+
case _ => terminate(StatusCodes.Unauthorized)
35+
}
36+
}
37+
}
38+
39+
object DefaultInvokerServer extends InvokerServerProvider {
40+
41+
// TODO: TBD, after FPCInvokerReactive is ready, can read the credentials from pureconfig
42+
val invokerUsername = "admin"
43+
val invokerPassword = "admin"
44+
45+
override def instance(
46+
invoker: InvokerCore)(implicit ec: ExecutionContext, actorSystem: ActorSystem, logger: Logging): BasicRasService =
47+
new BasicRasService {
48+
new DefaultInvokerServer(invoker, invokerUsername, invokerPassword)
49+
}
50+
}
Lines changed: 51 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,51 @@
1+
package org.apache.openwhisk.core.invoker
2+
3+
import akka.actor.ActorSystem
4+
import akka.http.scaladsl.model.StatusCodes
5+
import akka.http.scaladsl.model.headers.BasicHttpCredentials
6+
import akka.http.scaladsl.server.Route
7+
import org.apache.openwhisk.common.{Logging, TransactionId}
8+
import org.apache.openwhisk.core.containerpool.v2.InvokerHealthManager
9+
import org.apache.openwhisk.http.BasicRasService
10+
import org.apache.openwhisk.http.ErrorResponse.terminate
11+
import spray.json.PrettyPrinter
12+
13+
import scala.concurrent.ExecutionContext
14+
15+
/**
16+
* Implements web server to handle certain REST API calls.
17+
*/
18+
class FPCInvokerServer(val invoker: InvokerCore, systemUsername: String, systemPassword: String)(
19+
implicit val ec: ExecutionContext,
20+
val actorSystem: ActorSystem,
21+
val logger: Logging)
22+
extends BasicRasService {
23+
24+
/** Pretty print JSON response. */
25+
implicit val jsonPrettyResponsePrinter = PrettyPrinter
26+
27+
override def routes(implicit transid: TransactionId): Route = {
28+
super.routes ~ extractCredentials {
29+
case Some(BasicHttpCredentials(username, password)) if username == systemUsername && password == systemPassword =>
30+
(path("enable") & post) {
31+
invoker.enable()
32+
} ~ (path("disable") & post) {
33+
invoker.disable()
34+
} ~ (path("memory") & get) {
35+
complete(InvokerHealthManager.useMemory.toString)
36+
}
37+
case _ => terminate(StatusCodes.Unauthorized)
38+
}
39+
}
40+
}
41+
42+
object FPCInvokerServer extends InvokerServerProvider {
43+
44+
// TODO: TBD, after FPCInvokerReactive is ready, can read the credentials from pureconfig
45+
val invokerUsername = "admin"
46+
val invokerPassword = "admin"
47+
48+
override def instance(
49+
invoker: InvokerCore)(implicit ec: ExecutionContext, actorSystem: ActorSystem, logger: Logging): BasicRasService =
50+
new FPCInvokerServer(invoker, invokerUsername, invokerPassword)
51+
}

core/invoker/src/main/scala/org/apache/openwhisk/core/invoker/Invoker.scala

Lines changed: 5 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@ package org.apache.openwhisk.core.invoker
1919

2020
import akka.Done
2121
import akka.actor.{ActorSystem, CoordinatedShutdown}
22+
import akka.http.scaladsl.server.Route
2223
import akka.stream.ActorMaterializer
2324
import com.typesafe.config.ConfigValueFactory
2425
import kamon.Kamon
@@ -217,7 +218,10 @@ trait InvokerProvider extends Spi {
217218
}
218219

219220
// this trait can be used to add common implementation
220-
trait InvokerCore {}
221+
trait InvokerCore {
222+
def enable(): Route
223+
def disable(): Route
224+
}
221225

222226
/**
223227
* An Spi for providing RestAPI implementation for invoker.
@@ -227,9 +231,3 @@ trait InvokerServerProvider extends Spi {
227231
def instance(
228232
invoker: InvokerCore)(implicit ec: ExecutionContext, actorSystem: ActorSystem, logger: Logging): BasicRasService
229233
}
230-
231-
object DefaultInvokerServer extends InvokerServerProvider {
232-
override def instance(
233-
invoker: InvokerCore)(implicit ec: ExecutionContext, actorSystem: ActorSystem, logger: Logging): BasicRasService =
234-
new BasicRasService {}
235-
}

core/invoker/src/main/scala/org/apache/openwhisk/core/invoker/InvokerReactive.scala

Lines changed: 47 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -21,8 +21,10 @@ import java.nio.charset.StandardCharsets
2121
import java.time.Instant
2222

2323
import akka.Done
24-
import akka.actor.{ActorRefFactory, ActorSystem, CoordinatedShutdown, Props}
24+
import akka.actor.{ActorRef, ActorRefFactory, ActorSystem, CoordinatedShutdown, PoisonPill, Props}
2525
import akka.event.Logging.InfoLevel
26+
import akka.http.scaladsl.server.Directives.complete
27+
import akka.http.scaladsl.server.Route
2628
import akka.stream.ActorMaterializer
2729
import org.apache.openwhisk.common._
2830
import org.apache.openwhisk.common.tracing.WhiskTracerProvider
@@ -293,10 +295,52 @@ class InvokerReactive(
293295
}
294296

295297
private val healthProducer = msgProvider.getProducer(config)
296-
Scheduler.scheduleWaitAtMost(1.seconds)(() => {
298+
var scheduler: Option[ActorRef] = Some(Scheduler.scheduleWaitAtMost(1.seconds)(() => {
297299
healthProducer.send("health", PingMessage(instance)).andThen {
298300
case Failure(t) => logging.error(this, s"failed to ping the controller: $t")
299301
}
300-
})
302+
}))
303+
304+
override def enable(): Route = {
305+
scheduler match {
306+
case Some(_) => complete("Can't enable invoker again");
307+
case None =>
308+
scheduler = Some(Scheduler.scheduleWaitAtMost(1.seconds)(() => {
309+
healthProducer
310+
.send("health", PingMessage(instance))
311+
.andThen {
312+
case Failure(t) => logging.error(this, s"enable invoker, failed to ping the controller: $t")
313+
}
314+
}))
315+
complete("Success enable invoker");
316+
}
317+
}
318+
319+
override def disable(): Route = {
320+
scheduler match {
321+
case Some(schedulerActor) =>
322+
// Through negative number of invoker to pass unhealthy message
323+
healthProducer
324+
.send(
325+
"health",
326+
PingMessage(
327+
InvokerInstanceId(
328+
-1 - instance.instance,
329+
instance.uniqueName,
330+
instance.displayedName,
331+
instance.userMemory)),
332+
3)
333+
.andThen {
334+
case Success(_) =>
335+
schedulerActor ! PoisonPill
336+
scheduler = None
337+
logging.info(this, "send unhealthy message successfully")
338+
case Failure(_) =>
339+
logging.info(this, "failed to send unhealthy message")
340+
}
341+
complete("Success disable invoker");
342+
case None => complete("Can't disable invoker again")
343+
}
344+
}
301345

302346
}

tests/src/test/scala/common/WhiskProperties.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -270,6 +270,10 @@ public static String getBaseControllerAddress() {
270270
return getBaseControllerHost() + ":" + getControllerBasePort();
271271
}
272272

273+
public static String getBaseInvokerAddress(){
274+
return getInvokerHosts()[0] + ":" + whiskProperties.getProperty("invoker.hosts.basePort");
275+
}
276+
273277
public static int getMaxActionInvokesPerMinute() {
274278
String valStr = whiskProperties.getProperty("limits.actions.invokes.perMinute");
275279
return Integer.parseInt(valStr);
Lines changed: 128 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,128 @@
1+
package org.apache.openwhisk.core.invoker.test
2+
3+
import akka.http.scaladsl.model.StatusCodes.{OK, Unauthorized}
4+
import akka.http.scaladsl.model.headers.BasicHttpCredentials
5+
import akka.http.scaladsl.server.Route
6+
import akka.http.scaladsl.testkit.ScalatestRouteTest
7+
import common.StreamLogging
8+
import org.apache.openwhisk.common.TransactionId
9+
import org.apache.openwhisk.core.invoker.{DefaultInvokerServer, InvokerCore}
10+
import org.apache.openwhisk.http.BasicHttpService
11+
import org.junit.runner.RunWith
12+
import org.scalamock.scalatest.MockFactory
13+
import org.scalatest.{BeforeAndAfterAll, BeforeAndAfterEach, FlatSpec, Matchers}
14+
import org.scalatest.junit.JUnitRunner
15+
16+
/**
17+
* Tests InvokerServer API.
18+
*/
19+
@RunWith(classOf[JUnitRunner])
20+
class DefaultInvokerServerTests
21+
extends FlatSpec
22+
with BeforeAndAfterEach
23+
with BeforeAndAfterAll
24+
with ScalatestRouteTest
25+
with Matchers
26+
with StreamLogging
27+
with MockFactory {
28+
29+
def transid() = TransactionId("tid")
30+
31+
val systemUsername = "username"
32+
val systemPassword = "password"
33+
34+
val reactive = new TestInvokerReactive
35+
val server = new DefaultInvokerServer(reactive, systemUsername, systemPassword)
36+
37+
override protected def afterEach(): Unit = reactive.reset()
38+
39+
/** DefaultInvokerServer API tests */
40+
behavior of "DefaultInvokerServer API"
41+
42+
it should "enable invoker" in {
43+
implicit val tid = transid()
44+
val validCredentials = BasicHttpCredentials(systemUsername, systemPassword)
45+
Post(s"/enable") ~> addCredentials(validCredentials) ~> Route.seal(server.routes(tid)) ~> check {
46+
status should be(OK)
47+
reactive.enableCount shouldBe 1
48+
reactive.disableCount shouldBe 0
49+
}
50+
}
51+
52+
it should "disable invoker" in {
53+
implicit val tid = transid()
54+
val validCredentials = BasicHttpCredentials(systemUsername, systemPassword)
55+
Post(s"/disable") ~> addCredentials(validCredentials) ~> Route.seal(server.routes(tid)) ~> check {
56+
status should be(OK)
57+
reactive.enableCount shouldBe 0
58+
reactive.disableCount shouldBe 1
59+
}
60+
}
61+
62+
it should "not enable invoker with invalid credential" in {
63+
implicit val tid = transid()
64+
val invalidCredentials = BasicHttpCredentials("invaliduser", "invalidpass")
65+
Post(s"/enable") ~> addCredentials(invalidCredentials) ~> Route.seal(server.routes(tid)) ~> check {
66+
status should be(Unauthorized)
67+
reactive.enableCount shouldBe 0
68+
reactive.disableCount shouldBe 0
69+
}
70+
}
71+
72+
it should "not disable invoker with invalid credential" in {
73+
implicit val tid = transid()
74+
val invalidCredentials = BasicHttpCredentials("invaliduser", "invalidpass")
75+
Post(s"/disable") ~> addCredentials(invalidCredentials) ~> Route.seal(server.routes(tid)) ~> check {
76+
status should be(Unauthorized)
77+
reactive.enableCount shouldBe 0
78+
reactive.disableCount shouldBe 0
79+
}
80+
}
81+
82+
it should "not enable invoker with empty credential" in {
83+
implicit val tid = transid()
84+
Post(s"/enable") ~> Route.seal(server.routes(tid)) ~> check {
85+
status should be(Unauthorized)
86+
reactive.enableCount shouldBe 0
87+
reactive.disableCount shouldBe 0
88+
}
89+
}
90+
91+
it should "not disable invoker with empty credential" in {
92+
implicit val tid = transid()
93+
Post(s"/disable") ~> Route.seal(server.routes(tid)) ~> check {
94+
status should be(Unauthorized)
95+
reactive.enableCount shouldBe 0
96+
reactive.disableCount shouldBe 0
97+
}
98+
}
99+
100+
}
101+
102+
class TestInvokerReactive extends InvokerCore with BasicHttpService {
103+
var enableCount = 0
104+
var disableCount = 0
105+
106+
override def enable(): Route = {
107+
enableCount += 1
108+
complete("")
109+
}
110+
111+
override def disable(): Route = {
112+
disableCount += 1
113+
complete("")
114+
}
115+
116+
def reset(): Unit = {
117+
enableCount = 0
118+
disableCount = 0
119+
}
120+
121+
/**
122+
* Gets the routes implemented by the HTTP service.
123+
*
124+
* @param transid the id for the transaction (every request is assigned an id)
125+
*/
126+
override def routes(implicit transid: TransactionId): Route = ???
127+
128+
}

0 commit comments

Comments
 (0)