-
Notifications
You must be signed in to change notification settings - Fork 1.2k
[New Scheduler]Implement PFCInvokerServer #5098
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from 1 commit
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -153,6 +153,7 @@ class InvokerHealthManager(instanceId: InvokerInstanceId, | |
data.memory.inProgressMemory, | ||
instanceId.tags, | ||
instanceId.dedicatedNamespaces) | ||
InvokerHealthManager.useMemory = invokerResourceMessage.busyMemory + invokerResourceMessage.inProgressMemory | ||
dataManagementService ! UpdateDataOnChange(InvokerKeys.health(instanceId), invokerResourceMessage.serialize) | ||
stay using data.copy(currentInvokerResource = Some(invokerResourceMessage)) | ||
|
||
|
@@ -269,6 +270,7 @@ object InvokerHealthManager { | |
val healthActionNamePrefix = "invokerHealthTestAction" | ||
val bufferSize = 10 | ||
val bufferErrorTolerance = 3 | ||
var useMemory = 0l | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. nit: There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Ok, good catch, as i said above, i removed /memory api in this pr. i will add |
||
val healthActionIdentity: Identity = { | ||
val whiskSystem = "whisk.system" | ||
val uuid = UUID() | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,65 @@ | ||
/* | ||
* Licensed to the Apache Software Foundation (ASF) under one or more | ||
* contributor license agreements. See the NOTICE file distributed with | ||
* this work for additional information regarding copyright ownership. | ||
* The ASF licenses this file to You under the Apache License, Version 2.0 | ||
* (the "License"); you may not use this file except in compliance with | ||
* the License. You may obtain a copy of the License at | ||
* | ||
* http://www.apache.org/licenses/LICENSE-2.0 | ||
* | ||
* Unless required by applicable law or agreed to in writing, software | ||
* distributed under the License is distributed on an "AS IS" BASIS, | ||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
* See the License for the specific language governing permissions and | ||
* limitations under the License. | ||
*/ | ||
|
||
package org.apache.openwhisk.core.invoker | ||
|
||
import akka.actor.ActorSystem | ||
import akka.http.scaladsl.model.StatusCodes | ||
import akka.http.scaladsl.model.headers.BasicHttpCredentials | ||
import akka.http.scaladsl.server.Route | ||
import org.apache.openwhisk.common.{Logging, TransactionId} | ||
import org.apache.openwhisk.http.BasicRasService | ||
import org.apache.openwhisk.http.ErrorResponse.terminate | ||
import spray.json.PrettyPrinter | ||
|
||
import scala.concurrent.ExecutionContext | ||
|
||
/** | ||
* Implements web server to handle certain REST API calls. | ||
*/ | ||
class DefaultInvokerServer(val invoker: InvokerCore, systemUsername: String, systemPassword: String)( | ||
implicit val ec: ExecutionContext, | ||
val actorSystem: ActorSystem, | ||
val logger: Logging) | ||
extends BasicRasService { | ||
|
||
/** Pretty print JSON response. */ | ||
implicit val jsonPrettyResponsePrinter = PrettyPrinter | ||
|
||
override def routes(implicit transid: TransactionId): Route = { | ||
super.routes ~ extractCredentials { | ||
case Some(BasicHttpCredentials(username, password)) if username == systemUsername && password == systemPassword => | ||
(path("enable") & post) { | ||
invoker.enable() | ||
} ~ (path("disable") & post) { | ||
invoker.disable() | ||
} | ||
case _ => terminate(StatusCodes.Unauthorized) | ||
} | ||
} | ||
} | ||
|
||
object DefaultInvokerServer extends InvokerServerProvider { | ||
|
||
// TODO: TBD, after FPCInvokerReactive is ready, can read the credentials from pureconfig | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Not a problem for this mr, but I don't think that's a secure way of setting admin credentials There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Yes, after FPCInvokerReactive is ready, can read the credentials from pureconfig like other components. |
||
val invokerUsername = "admin" | ||
val invokerPassword = "admin" | ||
|
||
override def instance( | ||
invoker: InvokerCore)(implicit ec: ExecutionContext, actorSystem: ActorSystem, logger: Logging): BasicRasService = | ||
new DefaultInvokerServer(invoker, invokerUsername, invokerPassword) | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,68 @@ | ||
/* | ||
* Licensed to the Apache Software Foundation (ASF) under one or more | ||
* contributor license agreements. See the NOTICE file distributed with | ||
* this work for additional information regarding copyright ownership. | ||
* The ASF licenses this file to You under the Apache License, Version 2.0 | ||
* (the "License"); you may not use this file except in compliance with | ||
* the License. You may obtain a copy of the License at | ||
* | ||
* http://www.apache.org/licenses/LICENSE-2.0 | ||
* | ||
* Unless required by applicable law or agreed to in writing, software | ||
* distributed under the License is distributed on an "AS IS" BASIS, | ||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
* See the License for the specific language governing permissions and | ||
* limitations under the License. | ||
*/ | ||
|
||
package org.apache.openwhisk.core.invoker | ||
|
||
import akka.actor.ActorSystem | ||
import akka.http.scaladsl.model.StatusCodes | ||
import akka.http.scaladsl.model.headers.BasicHttpCredentials | ||
import akka.http.scaladsl.server.Route | ||
import org.apache.openwhisk.common.{Logging, TransactionId} | ||
import org.apache.openwhisk.core.containerpool.v2.InvokerHealthManager | ||
import org.apache.openwhisk.http.BasicRasService | ||
import org.apache.openwhisk.http.ErrorResponse.terminate | ||
import spray.json.PrettyPrinter | ||
|
||
import scala.concurrent.ExecutionContext | ||
|
||
/** | ||
* Implements web server to handle certain REST API calls. | ||
*/ | ||
class FPCInvokerServer(val invoker: InvokerCore, systemUsername: String, systemPassword: String)( | ||
implicit val ec: ExecutionContext, | ||
val actorSystem: ActorSystem, | ||
val logger: Logging) | ||
extends BasicRasService { | ||
|
||
/** Pretty print JSON response. */ | ||
implicit val jsonPrettyResponsePrinter = PrettyPrinter | ||
|
||
override def routes(implicit transid: TransactionId): Route = { | ||
super.routes ~ extractCredentials { | ||
case Some(BasicHttpCredentials(username, password)) if username == systemUsername && password == systemPassword => | ||
(path("enable") & post) { | ||
invoker.enable() | ||
} ~ (path("disable") & post) { | ||
invoker.disable() | ||
} ~ (path("memory") & get) { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Regarding For DefaultInvokerServer, can't use /memory to judge whether the invoker is idle, this is another topic for this. btw, we judge it like this, get every controller slot of the invoker, and do |
||
complete(InvokerHealthManager.useMemory.toString) | ||
} | ||
case _ => terminate(StatusCodes.Unauthorized) | ||
} | ||
} | ||
} | ||
|
||
object FPCInvokerServer extends InvokerServerProvider { | ||
|
||
// TODO: TBD, after FPCInvokerReactive is ready, can read the credentials from pureconfig | ||
val invokerUsername = "admin" | ||
val invokerPassword = "admin" | ||
|
||
override def instance( | ||
invoker: InvokerCore)(implicit ec: ExecutionContext, actorSystem: ActorSystem, logger: Logging): BasicRasService = | ||
new FPCInvokerServer(invoker, invokerUsername, invokerPassword) | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -19,6 +19,7 @@ package org.apache.openwhisk.core.invoker | |
|
||
import akka.Done | ||
import akka.actor.{ActorSystem, CoordinatedShutdown} | ||
import akka.http.scaladsl.server.Route | ||
import akka.stream.ActorMaterializer | ||
import com.typesafe.config.ConfigValueFactory | ||
import kamon.Kamon | ||
|
@@ -217,7 +218,10 @@ trait InvokerProvider extends Spi { | |
} | ||
|
||
// this trait can be used to add common implementation | ||
trait InvokerCore {} | ||
trait InvokerCore { | ||
def enable(): Route | ||
def disable(): Route | ||
} | ||
|
||
/** | ||
* An Spi for providing RestAPI implementation for invoker. | ||
|
@@ -227,9 +231,3 @@ trait InvokerServerProvider extends Spi { | |
def instance( | ||
invoker: InvokerCore)(implicit ec: ExecutionContext, actorSystem: ActorSystem, logger: Logging): BasicRasService | ||
} | ||
|
||
object DefaultInvokerServer extends InvokerServerProvider { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I moved DefaultInvokerServer to a separate file, keep the same as FPCInvokerServer. |
||
override def instance( | ||
invoker: InvokerCore)(implicit ec: ExecutionContext, actorSystem: ActorSystem, logger: Logging): BasicRasService = | ||
new BasicRasService {} | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -23,6 +23,7 @@ import java.time.Instant | |
import akka.Done | ||
import akka.actor.{ActorRefFactory, ActorSystem, CoordinatedShutdown, Props} | ||
import akka.event.Logging.InfoLevel | ||
import akka.http.scaladsl.server.Route | ||
import akka.stream.ActorMaterializer | ||
import org.apache.openwhisk.common._ | ||
import org.apache.openwhisk.common.tracing.WhiskTracerProvider | ||
|
@@ -299,4 +300,8 @@ class InvokerReactive( | |
} | ||
}) | ||
|
||
override def enable(): Route = ??? | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I haven't tried it but would it be ok to leave these methods empty to run it? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Sorry, you mean implement it like below?
Due to InvokerReactive extends InvokerCore which has There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. when deploy using |
||
|
||
override def disable(): Route = ??? | ||
|
||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,145 @@ | ||
/* | ||
* Licensed to the Apache Software Foundation (ASF) under one or more | ||
* contributor license agreements. See the NOTICE file distributed with | ||
* this work for additional information regarding copyright ownership. | ||
* The ASF licenses this file to You under the Apache License, Version 2.0 | ||
* (the "License"); you may not use this file except in compliance with | ||
* the License. You may obtain a copy of the License at | ||
* | ||
* http://www.apache.org/licenses/LICENSE-2.0 | ||
* | ||
* Unless required by applicable law or agreed to in writing, software | ||
* distributed under the License is distributed on an "AS IS" BASIS, | ||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
* See the License for the specific language governing permissions and | ||
* limitations under the License. | ||
*/ | ||
|
||
package org.apache.openwhisk.core.invoker.test | ||
|
||
import akka.http.scaladsl.model.StatusCodes.{OK, Unauthorized} | ||
import akka.http.scaladsl.model.headers.BasicHttpCredentials | ||
import akka.http.scaladsl.server.Route | ||
import akka.http.scaladsl.testkit.ScalatestRouteTest | ||
import common.StreamLogging | ||
import org.apache.openwhisk.common.TransactionId | ||
import org.apache.openwhisk.core.invoker.{DefaultInvokerServer, InvokerCore} | ||
import org.apache.openwhisk.http.BasicHttpService | ||
import org.junit.runner.RunWith | ||
import org.scalamock.scalatest.MockFactory | ||
import org.scalatest.{BeforeAndAfterAll, BeforeAndAfterEach, FlatSpec, Matchers} | ||
import org.scalatest.junit.JUnitRunner | ||
|
||
/** | ||
* Tests InvokerServer API. | ||
*/ | ||
@RunWith(classOf[JUnitRunner]) | ||
class DefaultInvokerServerTests | ||
extends FlatSpec | ||
with BeforeAndAfterEach | ||
with BeforeAndAfterAll | ||
with ScalatestRouteTest | ||
with Matchers | ||
with StreamLogging | ||
with MockFactory { | ||
|
||
def transid() = TransactionId("tid") | ||
|
||
val systemUsername = "username" | ||
val systemPassword = "password" | ||
|
||
val reactive = new TestInvokerReactive | ||
val server = new DefaultInvokerServer(reactive, systemUsername, systemPassword) | ||
|
||
override protected def afterEach(): Unit = reactive.reset() | ||
|
||
/** DefaultInvokerServer API tests */ | ||
behavior of "DefaultInvokerServer API" | ||
|
||
it should "enable invoker" in { | ||
implicit val tid = transid() | ||
val validCredentials = BasicHttpCredentials(systemUsername, systemPassword) | ||
Post(s"/enable") ~> addCredentials(validCredentials) ~> Route.seal(server.routes(tid)) ~> check { | ||
status should be(OK) | ||
reactive.enableCount shouldBe 1 | ||
reactive.disableCount shouldBe 0 | ||
} | ||
} | ||
|
||
it should "disable invoker" in { | ||
implicit val tid = transid() | ||
val validCredentials = BasicHttpCredentials(systemUsername, systemPassword) | ||
Post(s"/disable") ~> addCredentials(validCredentials) ~> Route.seal(server.routes(tid)) ~> check { | ||
status should be(OK) | ||
reactive.enableCount shouldBe 0 | ||
reactive.disableCount shouldBe 1 | ||
} | ||
} | ||
|
||
it should "not enable invoker with invalid credential" in { | ||
implicit val tid = transid() | ||
val invalidCredentials = BasicHttpCredentials("invaliduser", "invalidpass") | ||
Post(s"/enable") ~> addCredentials(invalidCredentials) ~> Route.seal(server.routes(tid)) ~> check { | ||
status should be(Unauthorized) | ||
reactive.enableCount shouldBe 0 | ||
reactive.disableCount shouldBe 0 | ||
} | ||
} | ||
|
||
it should "not disable invoker with invalid credential" in { | ||
implicit val tid = transid() | ||
val invalidCredentials = BasicHttpCredentials("invaliduser", "invalidpass") | ||
Post(s"/disable") ~> addCredentials(invalidCredentials) ~> Route.seal(server.routes(tid)) ~> check { | ||
status should be(Unauthorized) | ||
reactive.enableCount shouldBe 0 | ||
reactive.disableCount shouldBe 0 | ||
} | ||
} | ||
|
||
it should "not enable invoker with empty credential" in { | ||
implicit val tid = transid() | ||
Post(s"/enable") ~> Route.seal(server.routes(tid)) ~> check { | ||
status should be(Unauthorized) | ||
reactive.enableCount shouldBe 0 | ||
reactive.disableCount shouldBe 0 | ||
} | ||
} | ||
|
||
it should "not disable invoker with empty credential" in { | ||
implicit val tid = transid() | ||
Post(s"/disable") ~> Route.seal(server.routes(tid)) ~> check { | ||
status should be(Unauthorized) | ||
reactive.enableCount shouldBe 0 | ||
reactive.disableCount shouldBe 0 | ||
} | ||
} | ||
|
||
} | ||
|
||
class TestInvokerReactive extends InvokerCore with BasicHttpService { | ||
var enableCount = 0 | ||
var disableCount = 0 | ||
|
||
override def enable(): Route = { | ||
enableCount += 1 | ||
complete("") | ||
} | ||
|
||
override def disable(): Route = { | ||
disableCount += 1 | ||
complete("") | ||
} | ||
|
||
def reset(): Unit = { | ||
enableCount = 0 | ||
disableCount = 0 | ||
} | ||
|
||
/** | ||
* Gets the routes implemented by the HTTP service. | ||
* | ||
* @param transid the id for the transaction (every request is assigned an id) | ||
*/ | ||
override def routes(implicit transid: TransactionId): Route = ??? | ||
|
||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why do we need this mutable variable?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There has a get used memory API: http://$invoker:$port/memory,
The FunctionPullingContainerPool actor(which will be contributed) sends the pool's used memory info(busyMemory + inprogressMemory) to InvokerHealthyManager actor, so use a mutable variable to store the value.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I know that's there only one instance of the invoker health manager per instance / jvm, but it still seems risky to be mutating a var in an object like this especially when it's the source of truth on whether the invoker is idle or not.
Uh oh!
There was an error while loading. Please reload this page.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@style95 @bdoyle0182 , yes, not a good idea of
use mutable variable
It is better to do like below
But functionPullingContainerPool is implementing now, so i removed /memory api in this pr: 86f5531
and i will add /memory api in
[New Scheduler] FunctionPullingContainerPool
pr.