Skip to content

Commit ecb1509

Browse files
style95bdoyle0182
andauthored
[New Scheduler] Add DataManagementService (#5063)
* Add DataManagementService * Update common/scala/src/main/scala/org/apache/openwhisk/core/service/DataManagementService.scala Apply comment Co-authored-by: Brendan Doyle <[email protected]> * Apply suggestions from code review Update comments Co-authored-by: Brendan Doyle <[email protected]> * Apply comments * Add unit tests for DataManagementService * Remove unused variable * Add the license header * Change Lease * Pull docker image for the api gateway in advance Co-authored-by: Brendan Doyle <[email protected]>
1 parent 625fc5b commit ecb1509

File tree

8 files changed

+627
-3
lines changed

8 files changed

+627
-3
lines changed

ansible/group_vars/all

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -450,3 +450,8 @@ etcd_connect_string: "{% set ret = [] %}\
450450
{{ ret.append( hostvars[host].ansible_host + ':' + ((etcd.client.port+loop.index-1)|string) ) }}\
451451
{% endfor %}\
452452
{{ ret | join(',') }}"
453+
454+
scheduler:
455+
dataManagementService:
456+
retryInterval: "{{ scheduler_dataManagementService_retryInterval | default(1 second) }}"
457+

common/scala/src/main/scala/org/apache/openwhisk/core/WhiskConfig.scala

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -294,4 +294,6 @@ object ConfigKeys {
294294
val schedulerMaxPeek = "whisk.scheduler.max-peek"
295295

296296
val whiskClusterName = "whisk.cluster.name"
297+
298+
val dataManagementServiceRetryInterval = "whisk.scheduler.data-management-service.retryInterval"
297299
}

common/scala/src/main/scala/org/apache/openwhisk/core/etcd/EtcdClient.scala

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -27,15 +27,14 @@ import java.util.concurrent.Executors
2727

2828
import org.apache.openwhisk.core.ConfigKeys
2929
import org.apache.openwhisk.core.etcd.EtcdType._
30+
import org.apache.openwhisk.core.service.Lease
3031
import pureconfig.loadConfigOrThrow
3132
import spray.json.DefaultJsonProtocol
3233

3334
import scala.language.implicitConversions
3435
import scala.annotation.tailrec
3536
import scala.concurrent.{ExecutionContextExecutor, Future, Promise}
3637

37-
case class Lease(id: Long, ttl: Long)
38-
3938
object RichListenableFuture {
4039
implicit def convertToFuture[T](lf: ListenableFuture[T])(implicit ece: ExecutionContextExecutor): Future[T] = {
4140
val p = Promise[T]()
Lines changed: 328 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,328 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.openwhisk.core.service
19+
20+
import akka.actor.{Actor, ActorRef, ActorRefFactory, ActorSystem, Props}
21+
import akka.util.Timeout
22+
import io.grpc.StatusRuntimeException
23+
import org.apache.openwhisk.common.Logging
24+
import org.apache.openwhisk.core.ConfigKeys
25+
import org.apache.openwhisk.core.etcd.{EtcdClient, EtcdFollower, EtcdLeader}
26+
import org.apache.openwhisk.core.service.DataManagementService.retryInterval
27+
import pureconfig.loadConfigOrThrow
28+
29+
import scala.collection.concurrent.TrieMap
30+
import scala.collection.mutable.{Map, Queue}
31+
import scala.concurrent.ExecutionContext
32+
import scala.concurrent.duration._
33+
import scala.util.Success
34+
35+
// messages received by the actor
36+
// it is required to specify a recipient directly for the retryable message processing
37+
case class ElectLeader(key: String, value: String, recipient: ActorRef, watchEnabled: Boolean = true)
38+
case class RegisterInitialData(key: String,
39+
value: String,
40+
failoverEnabled: Boolean = true,
41+
recipient: Option[ActorRef] = None)
42+
43+
case class RegisterData(key: String, value: String, failoverEnabled: Boolean = true)
44+
case class UnregisterData(key: String)
45+
case class UpdateDataOnChange(key: String, value: String)
46+
47+
// messages sent by the actor
48+
case class ElectionResult(leadership: Either[EtcdFollower, EtcdLeader])
49+
case class FinishWork(key: String)
50+
case class InitialDataStorageResults(key: String, result: Either[AlreadyExist, Done])
51+
case class Done()
52+
case class AlreadyExist()
53+
54+
/**
55+
* This service is in charge of storing given data to ETCD.
56+
* In the event any issue occurs while storing data, the actor keeps trying until the data is stored guaranteeing delivery to ETCD.
57+
* So it guarantees the data is eventually stored.
58+
*/
59+
class DataManagementService(watcherService: ActorRef, workerFactory: ActorRefFactory => ActorRef)(
60+
implicit logging: Logging,
61+
actorSystem: ActorSystem)
62+
extends Actor {
63+
private implicit val ec = context.dispatcher
64+
65+
implicit val requestTimeout: Timeout = Timeout(5.seconds)
66+
private[service] val dataCache = TrieMap[String, String]()
67+
private val operations = Map.empty[String, Queue[Any]]
68+
private var inProgressKeys = Set.empty[String]
69+
private val watcherName = "data-management-service"
70+
71+
private val worker = workerFactory(context)
72+
73+
override def receive: Receive = {
74+
case FinishWork(key) =>
75+
// send waiting operation to worker if there is any, else update the inProgressKeys
76+
val ops = operations.get(key)
77+
if (ops.nonEmpty && ops.get.nonEmpty) {
78+
val operation = ops.get.dequeue()
79+
worker ! operation
80+
} else {
81+
inProgressKeys = inProgressKeys - key
82+
operations.remove(key) // remove empty queue from the map to free memory
83+
}
84+
85+
// normally these messages will be sent when queues are created.
86+
case request: ElectLeader =>
87+
if (inProgressKeys.contains(request.key)) {
88+
logging.info(this, s"save a request $request into a buffer")
89+
operations.getOrElseUpdate(request.key, Queue.empty[Any]).enqueue(request)
90+
} else {
91+
worker ! request
92+
inProgressKeys = inProgressKeys + request.key
93+
}
94+
95+
case request: RegisterInitialData =>
96+
// send WatchEndpoint first as the put operation will be retried until success if failed
97+
if (request.failoverEnabled)
98+
watcherService ! WatchEndpoint(request.key, request.value, isPrefix = false, watcherName, Set(DeleteEvent))
99+
if (inProgressKeys.contains(request.key)) {
100+
logging.info(this, s"save request $request into a buffer")
101+
operations.getOrElseUpdate(request.key, Queue.empty[Any]).enqueue(request)
102+
} else {
103+
worker ! request
104+
inProgressKeys = inProgressKeys + request.key
105+
}
106+
107+
case request: RegisterData =>
108+
// send WatchEndpoint first as the put operation will be retried until success if failed
109+
if (request.failoverEnabled)
110+
watcherService ! WatchEndpoint(request.key, request.value, isPrefix = false, watcherName, Set(DeleteEvent))
111+
if (inProgressKeys.contains(request.key)) {
112+
// the new put|delete operation will erase influences made by older operations like put&delete
113+
// so we can remove these old operations
114+
logging.info(this, s"save request $request into a buffer")
115+
val queue = operations.getOrElseUpdate(request.key, Queue.empty[Any]).filter { value =>
116+
value match {
117+
case _: RegisterData | _: WatcherClosed | _: RegisterInitialData => false
118+
case _ => true
119+
}
120+
}
121+
queue.enqueue(request)
122+
operations.update(request.key, queue)
123+
} else {
124+
worker ! request
125+
inProgressKeys = inProgressKeys + request.key
126+
}
127+
128+
case request: WatcherClosed =>
129+
if (inProgressKeys.contains(request.key)) {
130+
// The put|delete operations against the same key will overwrite the previous results.
131+
// For example, if we put a value, delete it and put a new value again, the final result will be the new value.
132+
// So we can remove these old operations
133+
logging.info(this, s"save request $request into a buffer")
134+
val queue = operations.getOrElseUpdate(request.key, Queue.empty[Any]).filter { value =>
135+
value match {
136+
case _: RegisterData | _: WatcherClosed | _: RegisterInitialData => false
137+
case _ => true
138+
}
139+
}
140+
queue.enqueue(request)
141+
operations.update(request.key, queue)
142+
} else {
143+
worker ! request
144+
inProgressKeys = inProgressKeys + request.key
145+
}
146+
147+
// It is required to close the watcher first before deleting etcd data
148+
// It is supposed to receive the WatcherClosed message after the watcher is stopped.
149+
case msg: UnregisterData =>
150+
watcherService ! UnwatchEndpoint(msg.key, isPrefix = false, watcherName, needFeedback = true)
151+
152+
case WatchEndpointRemoved(_, key, value, false) =>
153+
self ! RegisterInitialData(key, value, failoverEnabled = false) // the watcher is already setup
154+
155+
// It should not receive "prefixed" data
156+
case WatchEndpointRemoved(_, key, value, true) =>
157+
logging.error(this, s"unexpected data received: ${WatchEndpoint(key, value, isPrefix = true, watcherName)}")
158+
159+
case msg: UpdateDataOnChange =>
160+
dataCache.get(msg.key) match {
161+
case Some(cached) if cached == msg.value =>
162+
logging.debug(this, s"skip publishing data ${msg.key} because the data is not changed.")
163+
164+
case Some(cached) if cached != msg.value =>
165+
dataCache.update(msg.key, msg.value)
166+
self ! RegisterData(msg.key, msg.value, failoverEnabled = false) // the watcher is already setup
167+
168+
case None =>
169+
dataCache.put(msg.key, msg.value)
170+
self ! RegisterData(msg.key, msg.value)
171+
172+
}
173+
}
174+
}
175+
176+
object DataManagementService {
177+
val retryInterval: FiniteDuration = loadConfigOrThrow[FiniteDuration](ConfigKeys.dataManagementServiceRetryInterval)
178+
179+
def props(watcherService: ActorRef, workerFactory: ActorRefFactory => ActorRef)(implicit logging: Logging,
180+
actorSystem: ActorSystem): Props = {
181+
Props(new DataManagementService(watcherService, workerFactory))
182+
}
183+
}
184+
185+
private[service] class EtcdWorker(etcdClient: EtcdClient, leaseService: ActorRef)(implicit val ec: ExecutionContext,
186+
actorSystem: ActorSystem,
187+
logging: Logging)
188+
extends Actor {
189+
190+
private val dataManagementService = context.parent
191+
private var lease: Option[Lease] = None
192+
leaseService ! GetLease
193+
194+
override def receive: Receive = {
195+
case msg: Lease =>
196+
lease = Some(msg)
197+
198+
// leader election + endpoint management
199+
case request: ElectLeader =>
200+
lease match {
201+
case Some(l) =>
202+
etcdClient
203+
.electLeader(request.key, request.value, l)
204+
.andThen {
205+
case Success(msg) =>
206+
request.recipient ! ElectionResult(msg)
207+
dataManagementService ! FinishWork(request.key)
208+
}
209+
.recover {
210+
// if there is no lease, reissue it and retry immediately
211+
case t: StatusRuntimeException =>
212+
logging.warn(this, s"a lease is expired while leader election, reissue it: $t")
213+
lease = None
214+
leaseService ! GetLease
215+
sendMessageToSelfAfter(request, retryInterval)
216+
217+
// it should retry forever until the data is stored
218+
case t: Throwable =>
219+
logging.warn(this, s"unexpected error happened: $t, retry storing data")
220+
sendMessageToSelfAfter(request, retryInterval)
221+
}
222+
case None =>
223+
logging.warn(this, s"lease not found, retry storing data")
224+
leaseService ! GetLease
225+
sendMessageToSelfAfter(request, retryInterval)
226+
}
227+
228+
// only endpoint management
229+
case request: RegisterData =>
230+
lease match {
231+
case Some(l) =>
232+
etcdClient
233+
.put(request.key, request.value, l.id)
234+
.andThen {
235+
case Success(_) =>
236+
dataManagementService ! FinishWork(request.key)
237+
}
238+
.recover {
239+
// if there is no lease, reissue it and retry immediately
240+
case t: StatusRuntimeException =>
241+
logging.warn(this, s"a lease is expired while registering data ${request.key}, reissue it: $t")
242+
lease = None
243+
leaseService ! GetLease
244+
sendMessageToSelfAfter(request, retryInterval)
245+
246+
// it should retry forever until the data is stored
247+
case t: Throwable =>
248+
logging.warn(this, s"unexpected error happened: $t, retry storing data ${request.key}")
249+
sendMessageToSelfAfter(request, retryInterval)
250+
}
251+
case None =>
252+
logging.warn(this, s"lease not found, retry storing data ${request.key}")
253+
leaseService ! GetLease
254+
sendMessageToSelfAfter(request, retryInterval)
255+
}
256+
257+
// it stores the data iif there is no such one
258+
case request: RegisterInitialData =>
259+
lease match {
260+
case Some(l) =>
261+
etcdClient
262+
.putTxn(request.key, request.value, 0, l.id)
263+
.map { res =>
264+
dataManagementService ! FinishWork(request.key)
265+
if (res.getSucceeded) {
266+
logging.info(this, s"initial data storing succeeds for ${request.key}")
267+
request.recipient.map(_ ! InitialDataStorageResults(request.key, Right(Done())))
268+
} else {
269+
logging.info(this, s"data is already stored for: $request, cancel the initial data storing")
270+
request.recipient.map(_ ! InitialDataStorageResults(request.key, Left(AlreadyExist())))
271+
}
272+
}
273+
.recover {
274+
// if there is no lease, reissue it and retry immediately
275+
case t: StatusRuntimeException =>
276+
logging.warn(
277+
this,
278+
s"a lease is expired while registering an initial data ${request.key}, reissue it: $t")
279+
lease = None
280+
leaseService ! GetLease
281+
sendMessageToSelfAfter(request, retryInterval)
282+
283+
// it should retry forever until the data is stored
284+
case t: Throwable =>
285+
logging.warn(this, s"unexpected error happened: $t, retry storing data for ${request.key}")
286+
sendMessageToSelfAfter(request, retryInterval)
287+
}
288+
case None =>
289+
logging.warn(this, s"lease not found, retry storing data for ${request.key}")
290+
leaseService ! GetLease
291+
sendMessageToSelfAfter(request, retryInterval)
292+
}
293+
294+
case msg: WatcherClosed =>
295+
etcdClient
296+
.del(msg.key)
297+
.andThen {
298+
case Success(_) =>
299+
dataManagementService ! FinishWork(msg.key)
300+
}
301+
.recover {
302+
// if there is no lease, reissue it and retry immediately
303+
case t: StatusRuntimeException =>
304+
logging.warn(this, s"a lease is expired while deleting data ${msg.key}, reissue it: $t")
305+
lease = None
306+
leaseService ! GetLease
307+
sendMessageToSelfAfter(msg, retryInterval)
308+
309+
// it should retry forever until the data is stored
310+
case t: Throwable =>
311+
logging.warn(this, s"unexpected error happened: $t, retry storing data for ${msg.key}")
312+
sendMessageToSelfAfter(msg, retryInterval)
313+
}
314+
315+
}
316+
317+
private def sendMessageToSelfAfter(msg: Any, retryInterval: FiniteDuration) = {
318+
actorSystem.scheduler.scheduleOnce(retryInterval, self, msg)
319+
}
320+
}
321+
322+
object EtcdWorker {
323+
def props(etcdClient: EtcdClient, leaseService: ActorRef)(implicit ec: ExecutionContext,
324+
actorSystem: ActorSystem,
325+
logging: Logging): Props = {
326+
Props(new EtcdWorker(etcdClient, leaseService))
327+
}
328+
}

common/scala/src/main/scala/org/apache/openwhisk/core/service/WatcherService.scala

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -52,6 +52,7 @@ case class WatchEndpointInserted(override val watchKey: String,
5252
extends WatchEndpointOperation(watchKey, key, value, isPrefix)
5353
case class WatcherClosed(key: String, isPrefix: Boolean)
5454

55+
// These are abstraction for event from ETCD.
5556
sealed trait EtcdEvent
5657
case object PutEvent extends EtcdEvent
5758
case object DeleteEvent extends EtcdEvent

tests/src/test/scala/org/apache/openwhisk/common/etcd/EtcdLeaderShipUnitTests.scala

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,8 @@ import com.ibm.etcd.client.{EtcdClient => Client}
2828
import common.{StreamLogging, WskActorSystem}
2929
import io.grpc.{StatusRuntimeException, Status => GrpcStatus}
3030
import org.apache.openwhisk.core.etcd.EtcdType._
31-
import org.apache.openwhisk.core.etcd.{EtcdFollower, EtcdLeader, EtcdLeadershipApi, Lease}
31+
import org.apache.openwhisk.core.etcd.{EtcdFollower, EtcdLeader, EtcdLeadershipApi}
32+
import org.apache.openwhisk.core.service.Lease
3233
import org.junit.runner.RunWith
3334
import org.scalatest.concurrent.PatienceConfiguration.Timeout
3435
import org.scalatest.concurrent.ScalaFutures

0 commit comments

Comments
 (0)