Skip to content

Commit 9ee0b27

Browse files
committed
Update main method of the scheduler.
1 parent cf36299 commit 9ee0b27

File tree

5 files changed

+132
-23
lines changed

5 files changed

+132
-23
lines changed

common/scala/src/main/scala/org/apache/openwhisk/common/AverageRingBuffer.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -37,7 +37,7 @@ class AverageRingBuffer(private val maxSize: Int) {
3737

3838
def nonEmpty: Boolean = elements.nonEmpty
3939

40-
def average: Double = {
40+
def average: Double = {
4141
val size = elements.size
4242
if (size > 2) {
4343
(sum - max - min) / (size - 2)

core/scheduler/src/main/scala/org/apache/openwhisk/core/scheduler/Scheduler.scala

Lines changed: 114 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -18,8 +18,10 @@
1818
package org.apache.openwhisk.core.scheduler
1919

2020
import akka.Done
21-
import akka.actor.{ActorRef, ActorRefFactory, ActorSelection, ActorSystem, CoordinatedShutdown}
21+
import akka.actor.{ActorRef, ActorRefFactory, ActorSelection, ActorSystem, CoordinatedShutdown, Props}
22+
import akka.http.scaladsl.model.{HttpRequest, HttpResponse}
2223
import akka.util.Timeout
24+
import akka.pattern.ask
2325
import com.typesafe.config.ConfigValueFactory
2426
import kamon.Kamon
2527
import org.apache.openwhisk.common.Https.HttpsConfig
@@ -30,8 +32,20 @@ import org.apache.openwhisk.core.ack.{MessagingActiveAck, UserEventSender}
3032
import org.apache.openwhisk.core.connector._
3133
import org.apache.openwhisk.core.database.{ActivationStoreProvider, NoDocumentException, UserContext}
3234
import org.apache.openwhisk.core.entity._
35+
import org.apache.openwhisk.core.etcd.EtcdKV.{QueueKeys, SchedulerKeys}
36+
import org.apache.openwhisk.core.etcd.EtcdType.ByteStringToString
3337
import org.apache.openwhisk.core.etcd.{EtcdClient, EtcdConfig}
34-
import org.apache.openwhisk.core.service.{LeaseKeepAliveService, WatcherService}
38+
import org.apache.openwhisk.core.scheduler.container.{ContainerManager, CreationJobManager}
39+
import org.apache.openwhisk.core.scheduler.grpc.ActivationServiceImpl
40+
import org.apache.openwhisk.core.scheduler.queue.{
41+
DurationCheckerProvider,
42+
MemoryQueue,
43+
QueueManager,
44+
QueueSize,
45+
SchedulingDecisionMaker
46+
}
47+
import org.apache.openwhisk.core.service.{DataManagementService, EtcdWorker, LeaseKeepAliveService, WatcherService}
48+
import org.apache.openwhisk.grpc.ActivationServiceHandler
3549
import org.apache.openwhisk.http.BasicHttpService
3650
import org.apache.openwhisk.spi.SpiLoader
3751
import org.apache.openwhisk.utils.ExecutionContextFactory
@@ -44,6 +58,8 @@ import scala.language.postfixOps
4458
import scala.util.{Failure, Success, Try}
4559
import pureconfig.generic.auto._
4660

61+
import scala.collection.JavaConverters
62+
4763
class Scheduler(schedulerId: SchedulerInstanceId, schedulerEndpoints: SchedulerEndpoints)(implicit config: WhiskConfig,
4864
actorSystem: ActorSystem,
4965
logging: Logging)
@@ -77,24 +93,53 @@ class Scheduler(schedulerId: SchedulerInstanceId, schedulerEndpoints: SchedulerE
7793
case Failure(t) => logging.error(this, s"failed to save activation $activation, error: ${t.getMessage}")
7894
}
7995
}
80-
val durationCheckerProvider = "" // TODO: TBD
81-
val durationChecker = "" // TODO: TBD
96+
val durationCheckerProvider = SpiLoader.get[DurationCheckerProvider]
97+
val durationChecker = durationCheckerProvider.instance(actorSystem, logging)
8298

8399
override def getState: Future[(List[(SchedulerInstanceId, Int)], Int)] = {
84-
Future.successful((List((schedulerId, 0)), 0)) // TODO: TBD, after etcdClient is ready, can implement it
100+
logging.info(this, s"getting the queue states")
101+
etcdClient
102+
.getPrefix(s"${QueueKeys.inProgressPrefix}/${QueueKeys.queuePrefix}")
103+
.map(res => {
104+
JavaConverters
105+
.asScalaIteratorConverter(res.getKvsList.iterator())
106+
.asScala
107+
.map(kv => ByteStringToString(kv.getValue))
108+
.count(_ == schedulerId.asString)
109+
})
110+
.flatMap { creationCount =>
111+
etcdClient
112+
.get(SchedulerKeys.scheduler(schedulerId))
113+
.map(res => {
114+
JavaConverters
115+
.asScalaIteratorConverter(res.getKvsList.iterator())
116+
.asScala
117+
.map { kv =>
118+
SchedulerStates.parse(kv.getValue).getOrElse(SchedulerStates(schedulerId, -1, schedulerEndpoints))
119+
}
120+
.map { schedulerState =>
121+
(schedulerState.sid, schedulerState.queueSize)
122+
}
123+
.toList
124+
})
125+
.map { list =>
126+
(list, creationCount)
127+
}
128+
}
85129
}
86130

87131
override def getQueueSize: Future[Int] = {
88-
Future.successful(0) // TODO: TBD, after queueManager is ready, can implement it
132+
queueManager.ask(QueueSize)(Timeout(5.seconds)).mapTo[Int]
89133
}
90134

91135
override def getQueueStatusData: Future[List[StatusData]] = {
92-
Future.successful(List(StatusData("ns", "fqn", 0, "Running", "data"))) // TODO: TBD, after queueManager is ready, can implement it
136+
queueManager.ask(StatusQuery)(Timeout(5.seconds)).mapTo[Future[List[StatusData]]].flatten
93137
}
94138

95139
override def disable(): Unit = {
96140
logging.info(this, s"Gracefully shutting down the scheduler")
97-
// TODO: TBD, after containerManager and queueManager are ready, can implement it
141+
containerManager ! GracefulShutdown
142+
queueManager ! GracefulShutdown
98143
}
99144

100145
private def getUserLimit(invocationNamespace: String): Future[Int] = {
@@ -113,27 +158,67 @@ class Scheduler(schedulerId: SchedulerInstanceId, schedulerEndpoints: SchedulerE
113158
}
114159
}
115160

116-
private val etcdWorkerFactory = "" // TODO: TBD
161+
private val etcdWorkerFactory = (f: ActorRefFactory) => f.actorOf(EtcdWorker.props(etcdClient, leaseService))
117162

118163
/**
119164
* This component is in charge of storing data to ETCD.
120165
* Even if any error happens we can assume the data will be eventually available in the ETCD by this component.
121166
*/
122-
val dataManagementService = "" // TODO: TBD
167+
val dataManagementService: ActorRef =
168+
actorSystem.actorOf(DataManagementService.props(watcherService, etcdWorkerFactory))
169+
170+
val feedFactory = (f: ActorRefFactory,
171+
description: String,
172+
topic: String,
173+
maxActiveAcksPerPoll: Int,
174+
processAck: Array[Byte] => Future[Unit]) => {
175+
val consumer = msgProvider.getConsumer(config, topic, topic, maxActiveAcksPerPoll)
176+
f.actorOf(Props(new MessageFeed(description, logging, consumer, maxActiveAcksPerPoll, 1.second, processAck)))
177+
}
123178

124-
val creationJobManagerFactory = "" // TODO: TBD
179+
val creationJobManagerFactory: ActorRefFactory => ActorRef =
180+
factory => {
181+
factory.actorOf(CreationJobManager.props(feedFactory, schedulerId, dataManagementService))
182+
}
125183

126184
/**
127185
* This component is responsible for creating containers for a given action.
128186
* It relies on the creationJobManager to manage the container creation job.
129187
*/
130-
val containerManager = "" // TODO: TBD
188+
val containerManager: ActorRef =
189+
actorSystem.actorOf(
190+
ContainerManager.props(creationJobManagerFactory, msgProvider, schedulerId, etcdClient, config, watcherService))
131191

132192
/**
133193
* This is a factory to create memory queues.
134194
* In the new architecture, each action is given its own dedicated queue.
135195
*/
136-
val memoryQueueFactory = "" // TODO: TBD
196+
val memoryQueueFactory
197+
: (ActorRefFactory, String, FullyQualifiedEntityName, DocRevision, WhiskActionMetaData) => ActorRef =
198+
(factory, invocationNamespace, fqn, revision, actionMetaData) => {
199+
// Todo: Change this to SPI
200+
val decisionMaker = factory.actorOf(SchedulingDecisionMaker.props(invocationNamespace, fqn))
201+
202+
factory.actorOf(
203+
MemoryQueue.props(
204+
etcdClient,
205+
durationChecker,
206+
fqn,
207+
producer,
208+
config,
209+
invocationNamespace,
210+
revision,
211+
schedulerEndpoints,
212+
actionMetaData,
213+
dataManagementService,
214+
watcherService,
215+
containerManager,
216+
decisionMaker,
217+
schedulerId: SchedulerInstanceId,
218+
ack,
219+
store: (TransactionId, WhiskActivation, UserContext) => Future[Any],
220+
getUserLimit: String => Future[Int]))
221+
}
137222

138223
val topic = s"${Scheduler.topicPrefix}scheduler${schedulerId.asString}"
139224
val schedulerConsumer =
@@ -144,9 +229,22 @@ class Scheduler(schedulerId: SchedulerInstanceId, schedulerEndpoints: SchedulerE
144229
/**
145230
* This is one of the major components which take charge of managing queues and coordinating requests among the scheduler, controllers, and invokers.
146231
*/
147-
val queueManager = "" // TODO: TBD
148-
149-
//val serviceHandlers: HttpRequest => Future[HttpResponse] = ActivationServiceHandler.apply(ActivationServiceImpl()) TODO: TBD
232+
val queueManager = actorSystem.actorOf(
233+
QueueManager.props(
234+
entityStore,
235+
WhiskActionMetaData.get,
236+
etcdClient,
237+
schedulerEndpoints,
238+
schedulerId,
239+
dataManagementService,
240+
watcherService,
241+
ack,
242+
store: (TransactionId, WhiskActivation, UserContext) => Future[Any],
243+
memoryQueueFactory,
244+
schedulerConsumer),
245+
QueueManager.actorName)
246+
247+
val serviceHandlers: HttpRequest => Future[HttpResponse] = ActivationServiceHandler.apply(ActivationServiceImpl())
150248
}
151249

152250
case class CmdLineArgs(uniqueName: Option[String] = None, id: Option[Int] = None, displayedName: Option[String] = None)

core/scheduler/src/main/scala/org/apache/openwhisk/core/scheduler/container/CreationJobManager.scala

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -45,7 +45,7 @@ case object GetPoolStatus
4545

4646
case class JobEntry(action: FullyQualifiedEntityName, timer: Cancellable)
4747

48-
class CreationJobManager(feedFactory: (ActorRefFactory, String, Int, Array[Byte] => Future[Unit]) => ActorRef,
48+
class CreationJobManager(feedFactory: (ActorRefFactory, String, String, Int, Array[Byte] => Future[Unit]) => ActorRef,
4949
schedulerInstanceId: SchedulerInstanceId,
5050
dataManagementService: ActorRef)(implicit actorSystem: ActorSystem, logging: Logging)
5151
extends Actor {
@@ -201,7 +201,7 @@ class CreationJobManager(feedFactory: (ActorRefFactory, String, Int, Array[Byte]
201201
private val topicPrefix = loadConfigOrThrow[String](ConfigKeys.kafkaTopicsPrefix)
202202
private val topic = s"${topicPrefix}creationAck${schedulerInstanceId.asString}"
203203
private val maxActiveAcksPerPoll = 128
204-
private val ackFeed = feedFactory(actorSystem, topic, maxActiveAcksPerPoll, processAcknowledgement)
204+
private val ackFeed = feedFactory(actorSystem, "creationAck", topic, maxActiveAcksPerPoll, processAcknowledgement)
205205

206206
def processAcknowledgement(bytes: Array[Byte]): Future[Unit] = {
207207
Future(ContainerCreationAckMessage.parse(new String(bytes, StandardCharsets.UTF_8)))
@@ -224,7 +224,7 @@ class CreationJobManager(feedFactory: (ActorRefFactory, String, Int, Array[Byte]
224224
}
225225

226226
object CreationJobManager {
227-
def props(feedFactory: (ActorRefFactory, String, Int, Array[Byte] => Future[Unit]) => ActorRef,
227+
def props(feedFactory: (ActorRefFactory, String, String, Int, Array[Byte] => Future[Unit]) => ActorRef,
228228
schedulerInstanceId: SchedulerInstanceId,
229229
dataManagementService: ActorRef)(implicit actorSystem: ActorSystem, logging: Logging) =
230230
Props(new CreationJobManager(feedFactory, schedulerInstanceId, dataManagementService))

core/scheduler/src/main/scala/org/apache/openwhisk/core/scheduler/queue/MemoryQueue.scala

Lines changed: 13 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -34,9 +34,19 @@ import org.apache.openwhisk.core.etcd.EtcdClient
3434
import org.apache.openwhisk.core.etcd.EtcdKV.ContainerKeys.containerPrefix
3535
import org.apache.openwhisk.core.etcd.EtcdKV.{ContainerKeys, QueueKeys, ThrottlingKeys}
3636
import org.apache.openwhisk.core.scheduler.SchedulerEndpoints
37-
import org.apache.openwhisk.core.scheduler.message.{ContainerCreation, ContainerDeletion, FailedCreationJob, SuccessfulCreationJob}
37+
import org.apache.openwhisk.core.scheduler.message.{
38+
ContainerCreation,
39+
ContainerDeletion,
40+
FailedCreationJob,
41+
SuccessfulCreationJob
42+
}
3843
import org.apache.openwhisk.core.scheduler.grpc.{GetActivation, ActivationResponse => GetActivationResponse}
39-
import org.apache.openwhisk.core.scheduler.message.{ContainerCreation, ContainerDeletion, FailedCreationJob, SuccessfulCreationJob}
44+
import org.apache.openwhisk.core.scheduler.message.{
45+
ContainerCreation,
46+
ContainerDeletion,
47+
FailedCreationJob,
48+
SuccessfulCreationJob
49+
}
4050
import org.apache.openwhisk.core.service._
4151
import org.apache.openwhisk.core.{ConfigKeys, WhiskConfig}
4252
import org.apache.openwhisk.http.Messages.{namespaceLimitUnderZero, tooManyConcurrentRequests}
@@ -48,7 +58,7 @@ import scala.annotation.tailrec
4858
import scala.collection.immutable.Queue
4959
import scala.collection.mutable
5060
import scala.concurrent.duration._
51-
import scala.concurrent.{ExecutionContextExecutor, Future, Promise, duration}
61+
import scala.concurrent.{duration, ExecutionContextExecutor, Future, Promise}
5262
import scala.language.postfixOps
5363
import scala.util.{Failure, Success}
5464

tests/src/test/scala/org/apache/openwhisk/core/scheduler/container/test/CreationJobManagerTests.scala

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -95,6 +95,7 @@ class CreationJobManagerTests
9595
}
9696

9797
def feedFactory(actorRefFactory: ActorRefFactory,
98+
description: String,
9899
topic: String,
99100
maxActiveAcksPerPoll: Int,
100101
handler: Array[Byte] => Future[Unit]): ActorRef = {

0 commit comments

Comments
 (0)