Skip to content

Commit e05aa44

Browse files
authored
[New Scheduler] Implement KeepAliveService (#5067)
* Add KeepAliveService * Add KeepAliveService to Scheduler * Update name for case class * Include workers in state data to thread safe
1 parent cc9bc49 commit e05aa44

File tree

5 files changed

+428
-7
lines changed

5 files changed

+428
-7
lines changed

common/scala/src/main/scala/org/apache/openwhisk/common/Logging.scala

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -338,6 +338,7 @@ object LoggingMarkers {
338338
val timeout = "timeout"
339339

340340
private val controller = "controller"
341+
private val scheduler = "scheduler"
341342
private val invoker = "invoker"
342343
private val database = "database"
343344
private val activation = "activation"
@@ -555,6 +556,9 @@ object LoggingMarkers {
555556
LogMarkerToken(kafka, "topic", start, Some("delay"), Map("topic" -> topic))(MeasurementUnit.time.milliseconds)
556557
else LogMarkerToken(kafka, topic, start, Some("delay"))(MeasurementUnit.time.milliseconds)
557558

559+
def SCHEDULER_KEEP_ALIVE(leaseId: Long) =
560+
LogMarkerToken(scheduler, "keepAlive", counter, None, Map("leaseId" -> leaseId.toString))(MeasurementUnit.none)
561+
558562
/*
559563
* General markers
560564
*/

common/scala/src/main/scala/org/apache/openwhisk/core/WhiskConfig.scala

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -266,6 +266,7 @@ object ConfigKeys {
266266
val controllerActivation = s"$controller.activation"
267267

268268
val etcd = "whisk.etcd"
269+
val etcdLeaseTimeout = "whisk.etcd.lease.timeout"
269270
val etcdPoolThreads = "whisk.etcd.pool.threads"
270271

271272
val activationStore = "whisk.activation-store"
@@ -290,5 +291,7 @@ object ConfigKeys {
290291

291292
val azBlob = "whisk.azure-blob"
292293

294+
val schedulerMaxPeek = "whisk.scheduler.max-peek"
295+
293296
val whiskClusterName = "whisk.cluster.name"
294297
}
Lines changed: 179 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,179 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.openwhisk.core.service
19+
20+
import akka.actor.Status.{Failure => FailureMessage}
21+
import akka.actor.{ActorRef, ActorSystem, Cancellable, FSM, Props, Stash}
22+
import akka.pattern.pipe
23+
import org.apache.openwhisk.common.{Logging, LoggingMarkers, MetricEmitter}
24+
import org.apache.openwhisk.core.ConfigKeys
25+
import org.apache.openwhisk.core.entity.InstanceId
26+
import org.apache.openwhisk.core.etcd.EtcdClient
27+
import org.apache.openwhisk.core.etcd.EtcdKV.InstanceKeys.instanceLease
28+
import pureconfig.loadConfigOrThrow
29+
30+
import scala.concurrent.duration._
31+
import scala.concurrent.{ExecutionContextExecutor, Future}
32+
import scala.util.{Failure, Success}
33+
34+
// States
35+
sealed trait KeepAliveServiceState
36+
case object Ready extends KeepAliveServiceState
37+
case object Active extends KeepAliveServiceState
38+
39+
// Data
40+
sealed trait KeepAliveServiceData
41+
case object NoData extends KeepAliveServiceData
42+
case class Lease(id: Long, ttl: Long) extends KeepAliveServiceData
43+
case class ActiveStates(worker: Cancellable, lease: Lease) extends KeepAliveServiceData
44+
45+
// Events received by the actor
46+
case object RegrantLease
47+
case object GetLease
48+
case object GrantLease
49+
50+
// Events internally used
51+
case class SetLease(lease: Lease)
52+
case class SetWatcher(worker: Cancellable)
53+
54+
class LeaseKeepAliveService(etcdClient: EtcdClient, instanceId: InstanceId, watcherService: ActorRef)(
55+
implicit logging: Logging,
56+
actorSystem: ActorSystem)
57+
extends FSM[KeepAliveServiceState, KeepAliveServiceData]
58+
with Stash {
59+
60+
implicit val ec: ExecutionContextExecutor = context.dispatcher
61+
62+
private val leaseTimeout = loadConfigOrThrow[Int](ConfigKeys.etcdLeaseTimeout).seconds
63+
private val key = instanceLease(instanceId)
64+
private val watcherName = "lease-service"
65+
66+
self ! GrantLease
67+
startWith(Ready, NoData)
68+
69+
when(Ready) {
70+
case Event(GrantLease, NoData) =>
71+
etcdClient
72+
.grant(leaseTimeout.toSeconds)
73+
.map { res =>
74+
SetLease(Lease(res.getID, res.getTTL))
75+
}
76+
.pipeTo(self)
77+
stay
78+
79+
case Event(SetLease(lease), NoData) =>
80+
startKeepAliveService(lease)
81+
.pipeTo(self)
82+
logging.info(this, s"Granted a new lease $lease")
83+
stay using lease
84+
85+
case Event(SetWatcher(w), l: Lease) =>
86+
goto(Active) using ActiveStates(w, l)
87+
88+
case Event(t: FailureMessage, _) =>
89+
logging.warn(this, s"Failed to grant new lease caused by: $t")
90+
self ! GrantLease
91+
stay()
92+
93+
case _ => delay
94+
}
95+
96+
when(Active) {
97+
case Event(WatchEndpointRemoved(`key`, `key`, _, false), ActiveStates(worker, lease)) =>
98+
logging.info(this, s"endpoint ie removed so recreate a lease")
99+
recreateLease(worker, lease)
100+
101+
case Event(RegrantLease, ActiveStates(worker, lease)) =>
102+
logging.info(this, s"ReGrant a lease, old lease:${lease}")
103+
recreateLease(worker, lease)
104+
105+
case Event(GetLease, ActiveStates(_, lease)) =>
106+
logging.info(this, s"send the lease(${lease}) to ${sender()}")
107+
sender() ! lease
108+
stay()
109+
110+
case _ => delay
111+
}
112+
113+
initialize()
114+
115+
private def startKeepAliveService(lease: Lease): Future[SetWatcher] = {
116+
val worker =
117+
actorSystem.scheduler.schedule(initialDelay = 0.second, interval = 500.milliseconds)(keepAliveOnce(lease))
118+
119+
/**
120+
* To verify that lease has been deleted since timeout,
121+
* create a key using lease, watch the key, and receive an event for deletion.
122+
*/
123+
etcdClient.put(key, s"${lease.id}", lease.id).map { _ =>
124+
watcherService ! WatchEndpoint(key, s"${lease.id}", false, watcherName, Set(DeleteEvent))
125+
SetWatcher(worker)
126+
}
127+
}
128+
129+
private def keepAliveOnce(lease: Lease): Future[Long] = {
130+
etcdClient
131+
.keepAliveOnce(lease.id)
132+
.map(_.getID)
133+
.andThen {
134+
case Success(_) => MetricEmitter.emitCounterMetric(LoggingMarkers.SCHEDULER_KEEP_ALIVE(lease.id))
135+
case Failure(t) =>
136+
logging.warn(this, s"Failed to keep-alive of ${lease.id} caused by ${t}")
137+
self ! RegrantLease
138+
}
139+
}
140+
141+
private def recreateLease(worker: Cancellable, lease: Lease) = {
142+
logging.info(this, s"recreate a lease, old lease: $lease")
143+
worker.cancel() // stop scheduler
144+
watcherService ! UnwatchEndpoint(key, false, watcherName) // stop watcher
145+
etcdClient
146+
.revoke(lease.id) // delete lease
147+
.onComplete(_ => self ! GrantLease) // create lease
148+
goto(Ready) using NoData
149+
}
150+
151+
// Unstash all messages stashed while in intermediate state
152+
onTransition {
153+
case _ -> Ready => unstashAll()
154+
case _ -> Active => unstashAll()
155+
}
156+
157+
/** Delays all incoming messages until unstashAll() is called */
158+
def delay = {
159+
stash()
160+
stay
161+
}
162+
163+
override def postStop(): Unit = {
164+
stateData match {
165+
case ActiveStates(w, _) => w.cancel() // stop scheduler if that exist
166+
case _ => // do nothing
167+
}
168+
watcherService ! UnwatchEndpoint(key, false, watcherName)
169+
}
170+
}
171+
172+
object LeaseKeepAliveService {
173+
def props(etcdClient: EtcdClient, instanceId: InstanceId, watcherService: ActorRef)(
174+
implicit logging: Logging,
175+
actorSystem: ActorSystem): Props = {
176+
Props(new LeaseKeepAliveService(etcdClient, instanceId, watcherService))
177+
.withDispatcher("dispatchers.lease-service-dispatcher")
178+
}
179+
}

core/scheduler/src/main/scala/org/apache/openwhisk/core/scheduler/Scheduler.scala

Lines changed: 10 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -18,19 +18,21 @@
1818
package org.apache.openwhisk.core.scheduler
1919

2020
import akka.Done
21-
import akka.actor.{ActorRefFactory, ActorSelection, ActorSystem, CoordinatedShutdown}
21+
import akka.actor.{ActorRef, ActorRefFactory, ActorSelection, ActorSystem, CoordinatedShutdown}
2222
import akka.stream.ActorMaterializer
2323
import akka.util.Timeout
2424
import com.typesafe.config.ConfigValueFactory
2525
import kamon.Kamon
2626
import org.apache.openwhisk.common.Https.HttpsConfig
2727
import org.apache.openwhisk.common._
28-
import org.apache.openwhisk.core.WhiskConfig
28+
import org.apache.openwhisk.core.{ConfigKeys, WhiskConfig}
2929
import org.apache.openwhisk.core.WhiskConfig.{servicePort, _}
3030
import org.apache.openwhisk.core.ack.{MessagingActiveAck, UserEventSender}
3131
import org.apache.openwhisk.core.connector._
3232
import org.apache.openwhisk.core.database.{ActivationStoreProvider, NoDocumentException, UserContext}
3333
import org.apache.openwhisk.core.entity._
34+
import org.apache.openwhisk.core.etcd.{EtcdClient, EtcdConfig}
35+
import org.apache.openwhisk.core.service.{LeaseKeepAliveService, WatcherService}
3436
import org.apache.openwhisk.http.BasicHttpService
3537
import org.apache.openwhisk.spi.SpiLoader
3638
import org.apache.openwhisk.utils.ExecutionContextFactory
@@ -55,10 +57,11 @@ class Scheduler(schedulerId: SchedulerInstanceId, schedulerEndpoints: SchedulerE
5557
val msgProvider = SpiLoader.get[MessagingProvider]
5658
val producer = msgProvider.getProducer(config, Some(ActivationEntityLimit.MAX_ACTIVATION_LIMIT))
5759

58-
val maxPeek = "" // TODO: TBD
59-
val etcdClient = "" // TODO: TBD
60-
val watcherService = "" // TODO: TBD
61-
val leaseService = "" // TODO: TBD
60+
val maxPeek = loadConfigOrThrow[Int](ConfigKeys.schedulerMaxPeek)
61+
val etcdClient = EtcdClient(loadConfigOrThrow[EtcdConfig](ConfigKeys.etcd).hosts)
62+
val watcherService: ActorRef = actorSystem.actorOf(WatcherService.props(etcdClient))
63+
val leaseService =
64+
actorSystem.actorOf(LeaseKeepAliveService.props(etcdClient, schedulerId, watcherService))
6265

6366
implicit val entityStore = WhiskEntityStore.datastore()
6467
private val activationStore =
@@ -139,7 +142,7 @@ class Scheduler(schedulerId: SchedulerInstanceId, schedulerEndpoints: SchedulerE
139142
config,
140143
s"scheduler${schedulerId.asString}",
141144
s"scheduler${schedulerId.asString}",
142-
500, // TODO: to be updated with maxPeek variable
145+
maxPeek,
143146
maxPollInterval = TimeLimit.MAX_DURATION + 1.minute)
144147

145148
implicit val trasnid = TransactionId.containerCreation

0 commit comments

Comments
 (0)