18
18
package org .apache .openwhisk .core .scheduler
19
19
20
20
import akka .Done
21
- import akka .actor .{ActorRef , ActorRefFactory , ActorSelection , ActorSystem , CoordinatedShutdown }
21
+ import akka .actor .{ActorRef , ActorRefFactory , ActorSelection , ActorSystem , CoordinatedShutdown , Props }
22
+ import akka .http .scaladsl .model .{HttpRequest , HttpResponse }
22
23
import akka .util .Timeout
24
+ import akka .pattern .ask
23
25
import com .typesafe .config .ConfigValueFactory
24
26
import kamon .Kamon
25
27
import org .apache .openwhisk .common .Https .HttpsConfig
@@ -30,8 +32,20 @@ import org.apache.openwhisk.core.ack.{MessagingActiveAck, UserEventSender}
30
32
import org .apache .openwhisk .core .connector ._
31
33
import org .apache .openwhisk .core .database .{ActivationStoreProvider , NoDocumentException , UserContext }
32
34
import org .apache .openwhisk .core .entity ._
35
+ import org .apache .openwhisk .core .etcd .EtcdKV .{QueueKeys , SchedulerKeys }
36
+ import org .apache .openwhisk .core .etcd .EtcdType .ByteStringToString
33
37
import org .apache .openwhisk .core .etcd .{EtcdClient , EtcdConfig }
34
- import org .apache .openwhisk .core .service .{LeaseKeepAliveService , WatcherService }
38
+ import org .apache .openwhisk .core .scheduler .container .{ContainerManager , CreationJobManager }
39
+ import org .apache .openwhisk .core .scheduler .grpc .ActivationServiceImpl
40
+ import org .apache .openwhisk .core .scheduler .queue .{
41
+ DurationCheckerProvider ,
42
+ MemoryQueue ,
43
+ QueueManager ,
44
+ QueueSize ,
45
+ SchedulingDecisionMaker
46
+ }
47
+ import org .apache .openwhisk .core .service .{DataManagementService , EtcdWorker , LeaseKeepAliveService , WatcherService }
48
+ import org .apache .openwhisk .grpc .ActivationServiceHandler
35
49
import org .apache .openwhisk .http .BasicHttpService
36
50
import org .apache .openwhisk .spi .SpiLoader
37
51
import org .apache .openwhisk .utils .ExecutionContextFactory
@@ -44,6 +58,8 @@ import scala.language.postfixOps
44
58
import scala .util .{Failure , Success , Try }
45
59
import pureconfig .generic .auto ._
46
60
61
+ import scala .collection .JavaConverters
62
+
47
63
class Scheduler (schedulerId : SchedulerInstanceId , schedulerEndpoints : SchedulerEndpoints )(implicit config : WhiskConfig ,
48
64
actorSystem : ActorSystem ,
49
65
logging : Logging )
@@ -77,24 +93,53 @@ class Scheduler(schedulerId: SchedulerInstanceId, schedulerEndpoints: SchedulerE
77
93
case Failure (t) => logging.error(this , s " failed to save activation $activation, error: ${t.getMessage}" )
78
94
}
79
95
}
80
- val durationCheckerProvider = " " // TODO: TBD
81
- val durationChecker = " " // TODO: TBD
96
+ val durationCheckerProvider = SpiLoader .get[ DurationCheckerProvider ]
97
+ val durationChecker = durationCheckerProvider.instance(actorSystem, logging)
82
98
83
99
override def getState : Future [(List [(SchedulerInstanceId , Int )], Int )] = {
84
- Future .successful((List ((schedulerId, 0 )), 0 )) // TODO: TBD, after etcdClient is ready, can implement it
100
+ logging.info(this , s " getting the queue states " )
101
+ etcdClient
102
+ .getPrefix(s " ${QueueKeys .inProgressPrefix}/ ${QueueKeys .queuePrefix}" )
103
+ .map(res => {
104
+ JavaConverters
105
+ .asScalaIteratorConverter(res.getKvsList.iterator())
106
+ .asScala
107
+ .map(kv => ByteStringToString (kv.getValue))
108
+ .count(_ == schedulerId.asString)
109
+ })
110
+ .flatMap { creationCount =>
111
+ etcdClient
112
+ .get(SchedulerKeys .scheduler(schedulerId))
113
+ .map(res => {
114
+ JavaConverters
115
+ .asScalaIteratorConverter(res.getKvsList.iterator())
116
+ .asScala
117
+ .map { kv =>
118
+ SchedulerStates .parse(kv.getValue).getOrElse(SchedulerStates (schedulerId, - 1 , schedulerEndpoints))
119
+ }
120
+ .map { schedulerState =>
121
+ (schedulerState.sid, schedulerState.queueSize)
122
+ }
123
+ .toList
124
+ })
125
+ .map { list =>
126
+ (list, creationCount)
127
+ }
128
+ }
85
129
}
86
130
87
131
override def getQueueSize : Future [Int ] = {
88
- Future .successful( 0 ) // TODO: TBD, after queueManager is ready, can implement it
132
+ queueManager.ask( QueueSize )( Timeout ( 5 .seconds)).mapTo[ Int ]
89
133
}
90
134
91
135
override def getQueueStatusData : Future [List [StatusData ]] = {
92
- Future .successful( List ( StatusData ( " ns " , " fqn " , 0 , " Running " , " data " ))) // TODO: TBD, after queueManager is ready, can implement it
136
+ queueManager.ask( StatusQuery )( Timeout ( 5 .seconds)).mapTo[ Future [ List [ StatusData ]]].flatten
93
137
}
94
138
95
139
override def disable (): Unit = {
96
140
logging.info(this , s " Gracefully shutting down the scheduler " )
97
- // TODO: TBD, after containerManager and queueManager are ready, can implement it
141
+ containerManager ! GracefulShutdown
142
+ queueManager ! GracefulShutdown
98
143
}
99
144
100
145
private def getUserLimit (invocationNamespace : String ): Future [Int ] = {
@@ -113,27 +158,67 @@ class Scheduler(schedulerId: SchedulerInstanceId, schedulerEndpoints: SchedulerE
113
158
}
114
159
}
115
160
116
- private val etcdWorkerFactory = " " // TODO: TBD
161
+ private val etcdWorkerFactory = ( f : ActorRefFactory ) => f.actorOf( EtcdWorker .props(etcdClient, leaseService))
117
162
118
163
/**
119
164
* This component is in charge of storing data to ETCD.
120
165
* Even if any error happens we can assume the data will be eventually available in the ETCD by this component.
121
166
*/
122
- val dataManagementService = " " // TODO: TBD
167
+ val dataManagementService : ActorRef =
168
+ actorSystem.actorOf(DataManagementService .props(watcherService, etcdWorkerFactory))
169
+
170
+ val feedFactory = (f : ActorRefFactory ,
171
+ description : String ,
172
+ topic : String ,
173
+ maxActiveAcksPerPoll : Int ,
174
+ processAck : Array [Byte ] => Future [Unit ]) => {
175
+ val consumer = msgProvider.getConsumer(config, topic, topic, maxActiveAcksPerPoll)
176
+ f.actorOf(Props (new MessageFeed (description, logging, consumer, maxActiveAcksPerPoll, 1 .second, processAck)))
177
+ }
123
178
124
- val creationJobManagerFactory = " " // TODO: TBD
179
+ val creationJobManagerFactory : ActorRefFactory => ActorRef =
180
+ factory => {
181
+ factory.actorOf(CreationJobManager .props(feedFactory, schedulerId, dataManagementService))
182
+ }
125
183
126
184
/**
127
185
* This component is responsible for creating containers for a given action.
128
186
* It relies on the creationJobManager to manage the container creation job.
129
187
*/
130
- val containerManager = " " // TODO: TBD
188
+ val containerManager : ActorRef =
189
+ actorSystem.actorOf(
190
+ ContainerManager .props(creationJobManagerFactory, msgProvider, schedulerId, etcdClient, config, watcherService))
131
191
132
192
/**
133
193
* This is a factory to create memory queues.
134
194
* In the new architecture, each action is given its own dedicated queue.
135
195
*/
136
- val memoryQueueFactory = " " // TODO: TBD
196
+ val memoryQueueFactory
197
+ : (ActorRefFactory , String , FullyQualifiedEntityName , DocRevision , WhiskActionMetaData ) => ActorRef =
198
+ (factory, invocationNamespace, fqn, revision, actionMetaData) => {
199
+ // Todo: Change this to SPI
200
+ val decisionMaker = factory.actorOf(SchedulingDecisionMaker .props(invocationNamespace, fqn))
201
+
202
+ factory.actorOf(
203
+ MemoryQueue .props(
204
+ etcdClient,
205
+ durationChecker,
206
+ fqn,
207
+ producer,
208
+ config,
209
+ invocationNamespace,
210
+ revision,
211
+ schedulerEndpoints,
212
+ actionMetaData,
213
+ dataManagementService,
214
+ watcherService,
215
+ containerManager,
216
+ decisionMaker,
217
+ schedulerId : SchedulerInstanceId ,
218
+ ack,
219
+ store : (TransactionId , WhiskActivation , UserContext ) => Future [Any ],
220
+ getUserLimit : String => Future [Int ]))
221
+ }
137
222
138
223
val topic = s " ${Scheduler .topicPrefix}scheduler ${schedulerId.asString}"
139
224
val schedulerConsumer =
@@ -144,9 +229,22 @@ class Scheduler(schedulerId: SchedulerInstanceId, schedulerEndpoints: SchedulerE
144
229
/**
145
230
* This is one of the major components which take charge of managing queues and coordinating requests among the scheduler, controllers, and invokers.
146
231
*/
147
- val queueManager = " " // TODO: TBD
148
-
149
- // val serviceHandlers: HttpRequest => Future[HttpResponse] = ActivationServiceHandler.apply(ActivationServiceImpl()) TODO: TBD
232
+ val queueManager = actorSystem.actorOf(
233
+ QueueManager .props(
234
+ entityStore,
235
+ WhiskActionMetaData .get,
236
+ etcdClient,
237
+ schedulerEndpoints,
238
+ schedulerId,
239
+ dataManagementService,
240
+ watcherService,
241
+ ack,
242
+ store : (TransactionId , WhiskActivation , UserContext ) => Future [Any ],
243
+ memoryQueueFactory,
244
+ schedulerConsumer),
245
+ QueueManager .actorName)
246
+
247
+ val serviceHandlers : HttpRequest => Future [HttpResponse ] = ActivationServiceHandler .apply(ActivationServiceImpl ())
150
248
}
151
249
152
250
case class CmdLineArgs (uniqueName : Option [String ] = None , id : Option [Int ] = None , displayedName : Option [String ] = None )
0 commit comments