Skip to content

Commit 43caa23

Browse files
committed
Implement PFCInvokerServer
1 parent 59b67fe commit 43caa23

File tree

12 files changed

+524
-13
lines changed

12 files changed

+524
-13
lines changed

ansible/group_vars/all

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -219,6 +219,8 @@ invoker:
219219
keystore:
220220
password: "{{ invoker_keystore_password | default('openwhisk') }}"
221221
name: "{{ __invoker_ssl_keyPrefix }}openwhisk-keystore.p12"
222+
reactiveSpi: "{{ invokerReactive_spi | default('') }}"
223+
serverSpi: "{{ invokerServer_spi | default('') }}"
222224

223225
userLogs:
224226
spi: "{{ userLogs_spi | default('org.apache.openwhisk.core.containerpool.logging.DockerToActivationLogStoreProvider') }}"
@@ -453,5 +455,5 @@ etcd_connect_string: "{% set ret = [] %}\
453455

454456
scheduler:
455457
dataManagementService:
456-
retryInterval: "{{ scheduler_dataManagementService_retryInterval | default(1 second) }}"
458+
retryInterval: "{{ scheduler_dataManagementService_retryInterval | default('1 second') }}"
457459

ansible/roles/invoker/tasks/deploy.yml

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -259,6 +259,8 @@
259259
"CONFIG_kamon_statsd_hostname": "{{ metrics.kamon.host }}"
260260
"CONFIG_kamon_statsd_port": "{{ metrics.kamon.port }}"
261261
"CONFIG_whisk_spi_LogStoreProvider": "{{ userLogs.spi }}"
262+
"CONFIG_whisk_spi_InvokerProvider": "{{ invoker.reactiveSpi }}"
263+
"CONFIG_whisk_spi_InvokerServerProvider": "{{ invoker.serverSpi }}"
262264
"CONFIG_logback_log_level": "{{ invoker.loglevel }}"
263265
"CONFIG_whisk_memory_min": "{{ limit_action_memory_min | default() }}"
264266
"CONFIG_whisk_memory_max": "{{ limit_action_memory_max | default() }}"

core/controller/src/main/scala/org/apache/openwhisk/core/loadBalancer/InvokerSupervision.scala

Lines changed: 20 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -175,8 +175,26 @@ class InvokerPool(childFactory: (ActorRefFactory, InvokerInstanceId) => ActorRef
175175
val raw = new String(bytes, StandardCharsets.UTF_8)
176176
PingMessage.parse(raw) match {
177177
case Success(p: PingMessage) =>
178-
self ! p
179-
invokerPingFeed ! MessageFeed.Processed
178+
if (p.instance.instance < 0) {
179+
// Set the invoker status to UnHealthy immediately. Because if controller receives the success result of
180+
// action execute continuously, it will judge that invoker as up status all the time(It will not judge
181+
// the invoker status by timeout mechanism)
182+
logging.info(this, s"get invoker unhealthy message: invoker${Math.abs(p.instance.toInt) - 1}")
183+
status = status.updated(
184+
Math.abs(p.instance.toInt) - 1,
185+
new InvokerHealth(
186+
InvokerInstanceId(
187+
Math.abs(p.instance.toInt) - 1,
188+
p.instance.uniqueName,
189+
p.instance.displayedName,
190+
p.instance.userMemory),
191+
Unhealthy))
192+
logStatus()
193+
invokerPingFeed ! MessageFeed.Processed
194+
} else {
195+
self ! p
196+
invokerPingFeed ! MessageFeed.Processed
197+
}
180198

181199
case Failure(t) =>
182200
invokerPingFeed ! MessageFeed.Processed

core/invoker/src/main/scala/org/apache/openwhisk/core/containerpool/v2/InvokerHealthManager.scala

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -153,6 +153,7 @@ class InvokerHealthManager(instanceId: InvokerInstanceId,
153153
data.memory.inProgressMemory,
154154
instanceId.tags,
155155
instanceId.dedicatedNamespaces)
156+
InvokerHealthManager.useMemory = invokerResourceMessage.busyMemory + invokerResourceMessage.inProgressMemory
156157
dataManagementService ! UpdateDataOnChange(InvokerKeys.health(instanceId), invokerResourceMessage.serialize)
157158
stay using data.copy(currentInvokerResource = Some(invokerResourceMessage))
158159

@@ -269,6 +270,7 @@ object InvokerHealthManager {
269270
val healthActionNamePrefix = "invokerHealthTestAction"
270271
val bufferSize = 10
271272
val bufferErrorTolerance = 3
273+
var useMemory = 0l
272274
val healthActionIdentity: Identity = {
273275
val whiskSystem = "whisk.system"
274276
val uuid = UUID()
Lines changed: 48 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,48 @@
1+
package org.apache.openwhisk.core.invoker
2+
3+
import akka.actor.ActorSystem
4+
import akka.http.scaladsl.model.StatusCodes
5+
import akka.http.scaladsl.model.headers.BasicHttpCredentials
6+
import akka.http.scaladsl.server.Route
7+
import org.apache.openwhisk.common.{Logging, TransactionId}
8+
import org.apache.openwhisk.http.BasicRasService
9+
import org.apache.openwhisk.http.ErrorResponse.terminate
10+
import spray.json.PrettyPrinter
11+
12+
import scala.concurrent.ExecutionContext
13+
14+
/**
15+
* Implements web server to handle certain REST API calls.
16+
*/
17+
class DefaultInvokerServer(val invoker: InvokerCore, systemUsername: String, systemPassword: String)(
18+
implicit val ec: ExecutionContext,
19+
val actorSystem: ActorSystem,
20+
val logger: Logging)
21+
extends BasicRasService {
22+
23+
/** Pretty print JSON response. */
24+
implicit val jsonPrettyResponsePrinter = PrettyPrinter
25+
26+
override def routes(implicit transid: TransactionId): Route = {
27+
super.routes ~ extractCredentials {
28+
case Some(BasicHttpCredentials(username, password)) if username == systemUsername && password == systemPassword =>
29+
(path("enable") & post) {
30+
invoker.enable()
31+
} ~ (path("disable") & post) {
32+
invoker.disable()
33+
}
34+
case _ => terminate(StatusCodes.Unauthorized)
35+
}
36+
}
37+
}
38+
39+
object DefaultInvokerServer extends InvokerServerProvider {
40+
41+
// TODO: TBD, after FPCInvokerReactive is ready, can read the credentials from pureconfig
42+
val invokerUsername = "admin"
43+
val invokerPassword = "admin"
44+
45+
override def instance(
46+
invoker: InvokerCore)(implicit ec: ExecutionContext, actorSystem: ActorSystem, logger: Logging): BasicRasService =
47+
new DefaultInvokerServer(invoker, invokerUsername, invokerPassword)
48+
}
Lines changed: 51 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,51 @@
1+
package org.apache.openwhisk.core.invoker
2+
3+
import akka.actor.ActorSystem
4+
import akka.http.scaladsl.model.StatusCodes
5+
import akka.http.scaladsl.model.headers.BasicHttpCredentials
6+
import akka.http.scaladsl.server.Route
7+
import org.apache.openwhisk.common.{Logging, TransactionId}
8+
import org.apache.openwhisk.core.containerpool.v2.InvokerHealthManager
9+
import org.apache.openwhisk.http.BasicRasService
10+
import org.apache.openwhisk.http.ErrorResponse.terminate
11+
import spray.json.PrettyPrinter
12+
13+
import scala.concurrent.ExecutionContext
14+
15+
/**
16+
* Implements web server to handle certain REST API calls.
17+
*/
18+
class FPCInvokerServer(val invoker: InvokerCore, systemUsername: String, systemPassword: String)(
19+
implicit val ec: ExecutionContext,
20+
val actorSystem: ActorSystem,
21+
val logger: Logging)
22+
extends BasicRasService {
23+
24+
/** Pretty print JSON response. */
25+
implicit val jsonPrettyResponsePrinter = PrettyPrinter
26+
27+
override def routes(implicit transid: TransactionId): Route = {
28+
super.routes ~ extractCredentials {
29+
case Some(BasicHttpCredentials(username, password)) if username == systemUsername && password == systemPassword =>
30+
(path("enable") & post) {
31+
invoker.enable()
32+
} ~ (path("disable") & post) {
33+
invoker.disable()
34+
} ~ (path("memory") & get) {
35+
complete(InvokerHealthManager.useMemory.toString)
36+
}
37+
case _ => terminate(StatusCodes.Unauthorized)
38+
}
39+
}
40+
}
41+
42+
object FPCInvokerServer extends InvokerServerProvider {
43+
44+
// TODO: TBD, after FPCInvokerReactive is ready, can read the credentials from pureconfig
45+
val invokerUsername = "admin"
46+
val invokerPassword = "admin"
47+
48+
override def instance(
49+
invoker: InvokerCore)(implicit ec: ExecutionContext, actorSystem: ActorSystem, logger: Logging): BasicRasService =
50+
new FPCInvokerServer(invoker, invokerUsername, invokerPassword)
51+
}

core/invoker/src/main/scala/org/apache/openwhisk/core/invoker/Invoker.scala

Lines changed: 5 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@ package org.apache.openwhisk.core.invoker
1919

2020
import akka.Done
2121
import akka.actor.{ActorSystem, CoordinatedShutdown}
22+
import akka.http.scaladsl.server.Route
2223
import akka.stream.ActorMaterializer
2324
import com.typesafe.config.ConfigValueFactory
2425
import kamon.Kamon
@@ -217,7 +218,10 @@ trait InvokerProvider extends Spi {
217218
}
218219

219220
// this trait can be used to add common implementation
220-
trait InvokerCore {}
221+
trait InvokerCore {
222+
def enable(): Route
223+
def disable(): Route
224+
}
221225

222226
/**
223227
* An Spi for providing RestAPI implementation for invoker.
@@ -227,9 +231,3 @@ trait InvokerServerProvider extends Spi {
227231
def instance(
228232
invoker: InvokerCore)(implicit ec: ExecutionContext, actorSystem: ActorSystem, logger: Logging): BasicRasService
229233
}
230-
231-
object DefaultInvokerServer extends InvokerServerProvider {
232-
override def instance(
233-
invoker: InvokerCore)(implicit ec: ExecutionContext, actorSystem: ActorSystem, logger: Logging): BasicRasService =
234-
new BasicRasService {}
235-
}

core/invoker/src/main/scala/org/apache/openwhisk/core/invoker/InvokerReactive.scala

Lines changed: 47 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -21,8 +21,10 @@ import java.nio.charset.StandardCharsets
2121
import java.time.Instant
2222

2323
import akka.Done
24-
import akka.actor.{ActorRefFactory, ActorSystem, CoordinatedShutdown, Props}
24+
import akka.actor.{ActorRef, ActorRefFactory, ActorSystem, CoordinatedShutdown, PoisonPill, Props}
2525
import akka.event.Logging.InfoLevel
26+
import akka.http.scaladsl.server.Directives.complete
27+
import akka.http.scaladsl.server.Route
2628
import akka.stream.ActorMaterializer
2729
import org.apache.openwhisk.common._
2830
import org.apache.openwhisk.common.tracing.WhiskTracerProvider
@@ -293,10 +295,52 @@ class InvokerReactive(
293295
}
294296

295297
private val healthProducer = msgProvider.getProducer(config)
296-
Scheduler.scheduleWaitAtMost(1.seconds)(() => {
298+
var scheduler: Option[ActorRef] = Some(Scheduler.scheduleWaitAtMost(1.seconds)(() => {
297299
healthProducer.send("health", PingMessage(instance)).andThen {
298300
case Failure(t) => logging.error(this, s"failed to ping the controller: $t")
299301
}
300-
})
302+
}))
303+
304+
override def enable(): Route = {
305+
scheduler match {
306+
case Some(_) => complete("Can't enable invoker again");
307+
case None =>
308+
scheduler = Some(Scheduler.scheduleWaitAtMost(1.seconds)(() => {
309+
healthProducer
310+
.send("health", PingMessage(instance))
311+
.andThen {
312+
case Failure(t) => logging.error(this, s"enable invoker, failed to ping the controller: $t")
313+
}
314+
}))
315+
complete("Success enable invoker");
316+
}
317+
}
318+
319+
override def disable(): Route = {
320+
scheduler match {
321+
case Some(schedulerActor) =>
322+
// Through negative number of invoker to pass unhealthy message
323+
healthProducer
324+
.send(
325+
"health",
326+
PingMessage(
327+
InvokerInstanceId(
328+
-1 - instance.instance,
329+
instance.uniqueName,
330+
instance.displayedName,
331+
instance.userMemory)),
332+
3)
333+
.andThen {
334+
case Success(_) =>
335+
schedulerActor ! PoisonPill
336+
scheduler = None
337+
logging.info(this, "send unhealthy message successfully")
338+
case Failure(_) =>
339+
logging.info(this, "failed to send unhealthy message")
340+
}
341+
complete("Success disable invoker");
342+
case None => complete("Can't disable invoker again")
343+
}
344+
}
301345

302346
}

tests/src/test/scala/common/WhiskProperties.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -270,6 +270,10 @@ public static String getBaseControllerAddress() {
270270
return getBaseControllerHost() + ":" + getControllerBasePort();
271271
}
272272

273+
public static String getBaseInvokerAddress(){
274+
return getInvokerHosts()[0] + ":" + whiskProperties.getProperty("invoker.hosts.basePort");
275+
}
276+
273277
public static int getMaxActionInvokesPerMinute() {
274278
String valStr = whiskProperties.getProperty("limits.actions.invokes.perMinute");
275279
return Integer.parseInt(valStr);

0 commit comments

Comments
 (0)