Skip to content

Commit 508d699

Browse files
bdoyle0182Brendan Doyle
authored andcommitted
Clean Up Etcd Worker Actor (apache#5323)
* clean up etcd worker actor * revert etcd client local change for unit testing * fix scala 2.13 compilation Co-authored-by: Brendan Doyle <[email protected]>
1 parent d3b6979 commit 508d699

File tree

5 files changed

+347
-154
lines changed

5 files changed

+347
-154
lines changed
Lines changed: 166 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,166 @@
1+
package org.apache.openwhisk.core.etcd
2+
3+
import akka.actor.{Actor, ActorRef, ActorSystem, Props, Timers}
4+
import io.grpc.StatusRuntimeException
5+
import org.apache.openwhisk.common.Logging
6+
import org.apache.openwhisk.core.etcd.EtcdWorker.GetLeaseAndRetry
7+
import org.apache.openwhisk.core.service.DataManagementService.retryInterval
8+
import org.apache.openwhisk.core.service.{
9+
AlreadyExist,
10+
Done,
11+
ElectLeader,
12+
ElectionResult,
13+
FinishWork,
14+
GetLease,
15+
InitialDataStorageResults,
16+
Lease,
17+
RegisterData,
18+
RegisterInitialData,
19+
WatcherClosed
20+
}
21+
22+
import scala.concurrent.ExecutionContext
23+
import scala.concurrent.duration.FiniteDuration
24+
import scala.util.Success
25+
26+
class EtcdWorker(etcdClient: EtcdClient, leaseService: ActorRef)(implicit val ec: ExecutionContext,
27+
actorSystem: ActorSystem,
28+
logging: Logging)
29+
extends Actor
30+
with Timers {
31+
32+
private val dataManagementService = context.parent
33+
private var lease: Option[Lease] = None
34+
leaseService ! GetLease
35+
36+
override def receive: Receive = {
37+
case msg: Lease =>
38+
lease = Some(msg)
39+
case msg: GetLeaseAndRetry =>
40+
logging.warn(this, msg.log)
41+
if (!msg.skipLeaseRefresh) {
42+
if (msg.clearLease) {
43+
lease = None
44+
}
45+
leaseService ! GetLease
46+
}
47+
sendMessageToSelfAfter(msg.request, retryInterval)
48+
// leader election + endpoint management
49+
case request: ElectLeader =>
50+
lease match {
51+
case Some(l) =>
52+
etcdClient
53+
.electLeader(request.key, request.value, l)
54+
.andThen {
55+
case Success(msg) =>
56+
request.recipient ! ElectionResult(msg)
57+
dataManagementService ! FinishWork(request.key)
58+
}
59+
.recover {
60+
// if there is no lease, reissue it and retry immediately
61+
case t: StatusRuntimeException =>
62+
self ! GetLeaseAndRetry(request, s"a lease is expired while leader election, reissue it: $t")
63+
// it should retry forever until the data is stored
64+
case t: Throwable =>
65+
self ! GetLeaseAndRetry(
66+
request,
67+
s"unexpected error happened: $t, retry storing data",
68+
skipLeaseRefresh = true)
69+
}
70+
case None =>
71+
self ! GetLeaseAndRetry(request, s"lease not found, retry storing data ${request.key}", clearLease = false)
72+
}
73+
74+
// only endpoint management
75+
case request: RegisterData =>
76+
lease match {
77+
case Some(l) =>
78+
etcdClient
79+
.put(request.key, request.value, l.id)
80+
.andThen {
81+
case Success(_) =>
82+
dataManagementService ! FinishWork(request.key)
83+
}
84+
.recover {
85+
// if there is no lease, reissue it and retry immediately
86+
case t: StatusRuntimeException =>
87+
self ! GetLeaseAndRetry(
88+
request,
89+
s"a lease is expired while registering data ${request.key}, reissue it: $t")
90+
// it should retry forever until the data is stored
91+
case t: Throwable =>
92+
self ! GetLeaseAndRetry(
93+
request,
94+
s"unexpected error happened: $t, retry storing data ${request.key}",
95+
skipLeaseRefresh = true)
96+
}
97+
case None =>
98+
self ! GetLeaseAndRetry(request, s"lease not found, retry storing data ${request.key}", clearLease = false)
99+
}
100+
// it stores the data iif there is no such one
101+
case request: RegisterInitialData =>
102+
lease match {
103+
case Some(l) =>
104+
etcdClient
105+
.putTxn(request.key, request.value, 0, l.id)
106+
.map { res =>
107+
dataManagementService ! FinishWork(request.key)
108+
if (res.getSucceeded) {
109+
logging.info(this, s"initial data storing succeeds for ${request.key}")
110+
request.recipient.map(_ ! InitialDataStorageResults(request.key, Right(Done())))
111+
} else {
112+
logging.info(this, s"data is already stored for: $request, cancel the initial data storing")
113+
request.recipient.map(_ ! InitialDataStorageResults(request.key, Left(AlreadyExist())))
114+
}
115+
}
116+
.recover {
117+
// if there is no lease, reissue it and retry immediately
118+
case t: StatusRuntimeException =>
119+
self ! GetLeaseAndRetry(
120+
request,
121+
s"a lease is expired while registering an initial data ${request.key}, reissue it: $t")
122+
// it should retry forever until the data is stored
123+
case t: Throwable =>
124+
self ! GetLeaseAndRetry(
125+
request,
126+
s"unexpected error happened: $t, retry storing data ${request.key}",
127+
skipLeaseRefresh = true)
128+
}
129+
case None =>
130+
self ! GetLeaseAndRetry(request, s"lease not found, retry storing data ${request.key}", clearLease = false)
131+
}
132+
133+
case msg: WatcherClosed =>
134+
etcdClient
135+
.del(msg.key)
136+
.andThen {
137+
case Success(_) =>
138+
dataManagementService ! FinishWork(msg.key)
139+
}
140+
.recover {
141+
// if there is no lease, reissue it and retry immediately
142+
case t: StatusRuntimeException =>
143+
self ! GetLeaseAndRetry(msg, s"a lease is expired while deleting data ${msg.key}, reissue it: $t")
144+
// it should retry forever until the data is stored
145+
case t: Throwable =>
146+
self ! GetLeaseAndRetry(
147+
msg,
148+
s"unexpected error happened: $t, retry storing data for ${msg.key}",
149+
skipLeaseRefresh = true)
150+
}
151+
}
152+
153+
private def sendMessageToSelfAfter(msg: Any, retryInterval: FiniteDuration) = {
154+
timers.startSingleTimer(msg, msg, retryInterval)
155+
}
156+
}
157+
158+
object EtcdWorker {
159+
case class GetLeaseAndRetry(request: Any, log: String, clearLease: Boolean = true, skipLeaseRefresh: Boolean = false)
160+
161+
def props(etcdClient: EtcdClient, leaseService: ActorRef)(implicit ec: ExecutionContext,
162+
actorSystem: ActorSystem,
163+
logging: Logging): Props = {
164+
Props(new EtcdWorker(etcdClient, leaseService))
165+
}
166+
}

common/scala/src/main/scala/org/apache/openwhisk/core/service/DataManagementService.scala

Lines changed: 1 addition & 150 deletions
Original file line numberDiff line numberDiff line change
@@ -19,18 +19,14 @@ package org.apache.openwhisk.core.service
1919

2020
import akka.actor.{Actor, ActorRef, ActorRefFactory, ActorSystem, Props}
2121
import akka.util.Timeout
22-
import io.grpc.StatusRuntimeException
2322
import org.apache.openwhisk.common.Logging
2423
import org.apache.openwhisk.core.ConfigKeys
25-
import org.apache.openwhisk.core.etcd.{EtcdClient, EtcdFollower, EtcdLeader}
26-
import org.apache.openwhisk.core.service.DataManagementService.retryInterval
24+
import org.apache.openwhisk.core.etcd.{EtcdFollower, EtcdLeader}
2725
import pureconfig.loadConfigOrThrow
2826

2927
import scala.collection.concurrent.TrieMap
3028
import scala.collection.mutable.{Map, Queue}
31-
import scala.concurrent.ExecutionContext
3229
import scala.concurrent.duration._
33-
import scala.util.Success
3430

3531
// messages received by the actor
3632
// it is required to specify a recipient directly for the retryable message processing
@@ -181,148 +177,3 @@ object DataManagementService {
181177
Props(new DataManagementService(watcherService, workerFactory))
182178
}
183179
}
184-
185-
private[service] class EtcdWorker(etcdClient: EtcdClient, leaseService: ActorRef)(implicit val ec: ExecutionContext,
186-
actorSystem: ActorSystem,
187-
logging: Logging)
188-
extends Actor {
189-
190-
private val dataManagementService = context.parent
191-
private var lease: Option[Lease] = None
192-
leaseService ! GetLease
193-
194-
override def receive: Receive = {
195-
case msg: Lease =>
196-
lease = Some(msg)
197-
198-
// leader election + endpoint management
199-
case request: ElectLeader =>
200-
lease match {
201-
case Some(l) =>
202-
etcdClient
203-
.electLeader(request.key, request.value, l)
204-
.andThen {
205-
case Success(msg) =>
206-
request.recipient ! ElectionResult(msg)
207-
dataManagementService ! FinishWork(request.key)
208-
}
209-
.recover {
210-
// if there is no lease, reissue it and retry immediately
211-
case t: StatusRuntimeException =>
212-
logging.warn(this, s"a lease is expired while leader election, reissue it: $t")
213-
lease = None
214-
leaseService ! GetLease
215-
sendMessageToSelfAfter(request, retryInterval)
216-
217-
// it should retry forever until the data is stored
218-
case t: Throwable =>
219-
logging.warn(this, s"unexpected error happened: $t, retry storing data")
220-
sendMessageToSelfAfter(request, retryInterval)
221-
}
222-
case None =>
223-
logging.warn(this, s"lease not found, retry storing data")
224-
leaseService ! GetLease
225-
sendMessageToSelfAfter(request, retryInterval)
226-
}
227-
228-
// only endpoint management
229-
case request: RegisterData =>
230-
lease match {
231-
case Some(l) =>
232-
etcdClient
233-
.put(request.key, request.value, l.id)
234-
.andThen {
235-
case Success(_) =>
236-
dataManagementService ! FinishWork(request.key)
237-
}
238-
.recover {
239-
// if there is no lease, reissue it and retry immediately
240-
case t: StatusRuntimeException =>
241-
logging.warn(this, s"a lease is expired while registering data ${request.key}, reissue it: $t")
242-
lease = None
243-
leaseService ! GetLease
244-
sendMessageToSelfAfter(request, retryInterval)
245-
246-
// it should retry forever until the data is stored
247-
case t: Throwable =>
248-
logging.warn(this, s"unexpected error happened: $t, retry storing data ${request.key}")
249-
sendMessageToSelfAfter(request, retryInterval)
250-
}
251-
case None =>
252-
logging.warn(this, s"lease not found, retry storing data ${request.key}")
253-
leaseService ! GetLease
254-
sendMessageToSelfAfter(request, retryInterval)
255-
}
256-
257-
// it stores the data iif there is no such one
258-
case request: RegisterInitialData =>
259-
lease match {
260-
case Some(l) =>
261-
etcdClient
262-
.putTxn(request.key, request.value, 0, l.id)
263-
.map { res =>
264-
dataManagementService ! FinishWork(request.key)
265-
if (res.getSucceeded) {
266-
logging.info(this, s"initial data storing succeeds for ${request.key}")
267-
request.recipient.map(_ ! InitialDataStorageResults(request.key, Right(Done())))
268-
} else {
269-
logging.info(this, s"data is already stored for: $request, cancel the initial data storing")
270-
request.recipient.map(_ ! InitialDataStorageResults(request.key, Left(AlreadyExist())))
271-
}
272-
}
273-
.recover {
274-
// if there is no lease, reissue it and retry immediately
275-
case t: StatusRuntimeException =>
276-
logging.warn(
277-
this,
278-
s"a lease is expired while registering an initial data ${request.key}, reissue it: $t")
279-
lease = None
280-
leaseService ! GetLease
281-
sendMessageToSelfAfter(request, retryInterval)
282-
283-
// it should retry forever until the data is stored
284-
case t: Throwable =>
285-
logging.warn(this, s"unexpected error happened: $t, retry storing data for ${request.key}")
286-
sendMessageToSelfAfter(request, retryInterval)
287-
}
288-
case None =>
289-
logging.warn(this, s"lease not found, retry storing data for ${request.key}")
290-
leaseService ! GetLease
291-
sendMessageToSelfAfter(request, retryInterval)
292-
}
293-
294-
case msg: WatcherClosed =>
295-
etcdClient
296-
.del(msg.key)
297-
.andThen {
298-
case Success(_) =>
299-
dataManagementService ! FinishWork(msg.key)
300-
}
301-
.recover {
302-
// if there is no lease, reissue it and retry immediately
303-
case t: StatusRuntimeException =>
304-
logging.warn(this, s"a lease is expired while deleting data ${msg.key}, reissue it: $t")
305-
lease = None
306-
leaseService ! GetLease
307-
sendMessageToSelfAfter(msg, retryInterval)
308-
309-
// it should retry forever until the data is stored
310-
case t: Throwable =>
311-
logging.warn(this, s"unexpected error happened: $t, retry storing data for ${msg.key}")
312-
sendMessageToSelfAfter(msg, retryInterval)
313-
}
314-
315-
}
316-
317-
private def sendMessageToSelfAfter(msg: Any, retryInterval: FiniteDuration) = {
318-
actorSystem.scheduler.scheduleOnce(retryInterval, self, msg)
319-
}
320-
}
321-
322-
object EtcdWorker {
323-
def props(etcdClient: EtcdClient, leaseService: ActorRef)(implicit ec: ExecutionContext,
324-
actorSystem: ActorSystem,
325-
logging: Logging): Props = {
326-
Props(new EtcdWorker(etcdClient, leaseService))
327-
}
328-
}

core/invoker/src/main/scala/org/apache/openwhisk/core/invoker/FPCInvokerReactive.scala

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -37,10 +37,10 @@ import org.apache.openwhisk.core.etcd.EtcdKV.ContainerKeys.containerPrefix
3737
import org.apache.openwhisk.core.etcd.EtcdKV.QueueKeys.queue
3838
import org.apache.openwhisk.core.etcd.EtcdKV.{ContainerKeys, SchedulerKeys}
3939
import org.apache.openwhisk.core.etcd.EtcdType._
40-
import org.apache.openwhisk.core.etcd.{EtcdClient, EtcdConfig}
40+
import org.apache.openwhisk.core.etcd.{EtcdClient, EtcdConfig, EtcdWorker}
4141
import org.apache.openwhisk.core.invoker.Invoker.InvokerEnabled
4242
import org.apache.openwhisk.core.scheduler.{SchedulerEndpoints, SchedulerStates}
43-
import org.apache.openwhisk.core.service.{DataManagementService, EtcdWorker, LeaseKeepAliveService, WatcherService}
43+
import org.apache.openwhisk.core.service.{DataManagementService, LeaseKeepAliveService, WatcherService}
4444
import org.apache.openwhisk.core.{ConfigKeys, WarmUp, WhiskConfig}
4545
import org.apache.openwhisk.grpc.{ActivationServiceClient, FetchRequest}
4646
import org.apache.openwhisk.spi.SpiLoader

core/scheduler/src/main/scala/org/apache/openwhisk/core/scheduler/Scheduler.scala

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -36,11 +36,11 @@ import org.apache.openwhisk.core.database.{ActivationStoreProvider, NoDocumentEx
3636
import org.apache.openwhisk.core.entity._
3737
import org.apache.openwhisk.core.etcd.EtcdKV.{QueueKeys, SchedulerKeys}
3838
import org.apache.openwhisk.core.etcd.EtcdType.ByteStringToString
39-
import org.apache.openwhisk.core.etcd.{EtcdClient, EtcdConfig}
39+
import org.apache.openwhisk.core.etcd.{EtcdClient, EtcdConfig, EtcdWorker}
4040
import org.apache.openwhisk.core.scheduler.container.{ContainerManager, CreationJobManager}
4141
import org.apache.openwhisk.core.scheduler.grpc.ActivationServiceImpl
4242
import org.apache.openwhisk.core.scheduler.queue._
43-
import org.apache.openwhisk.core.service.{DataManagementService, EtcdWorker, LeaseKeepAliveService, WatcherService}
43+
import org.apache.openwhisk.core.service.{DataManagementService, LeaseKeepAliveService, WatcherService}
4444
import org.apache.openwhisk.core.{ConfigKeys, WhiskConfig}
4545
import org.apache.openwhisk.grpc.ActivationServiceHandler
4646
import org.apache.openwhisk.http.BasicHttpService

0 commit comments

Comments
 (0)