@@ -4,13 +4,15 @@ import akka.actor.{Actor, ActorRef, ActorRefFactory, ActorSystem, Props}
4
4
import akka .util .Timeout
5
5
import io .grpc .StatusRuntimeException
6
6
import org .apache .openwhisk .common .Logging
7
+ import org .apache .openwhisk .core .ConfigKeys
7
8
import org .apache .openwhisk .core .etcd .{EtcdClient , EtcdFollower , EtcdLeader }
8
9
import org .apache .openwhisk .core .service .DataManagementService .retryInterval
10
+ import pureconfig .loadConfigOrThrow
9
11
10
12
import scala .collection .concurrent .TrieMap
11
13
import scala .collection .mutable .{Map , Queue }
14
+ import scala .concurrent .ExecutionContext
12
15
import scala .concurrent .duration ._
13
- import scala .concurrent .{ExecutionContext , Future }
14
16
import scala .util .Success
15
17
16
18
// messages received by the actor
@@ -22,7 +24,7 @@ case class RegisterInitialData(key: String,
22
24
recipient : Option [ActorRef ] = None )
23
25
24
26
case class RegisterData (key : String , value : String , failoverEnabled : Boolean = true )
25
- case class DeRegisterData (key : String )
27
+ case class UnregisterData (key : String )
26
28
case class UpdateDataOnChange (key : String , value : String )
27
29
28
30
// messages sent by the actor
@@ -66,15 +68,15 @@ class DataManagementService(watcherService: ActorRef, workerFactory: ActorRefFac
66
68
// normally these messages will be sent when queues are created.
67
69
case request : ElectLeader =>
68
70
if (inProgressKeys.contains(request.key)) {
69
- logging.info(this , s " save request $request into a buffer " )
71
+ logging.info(this , s " save a request $request into a buffer " )
70
72
operations.getOrElseUpdate(request.key, Queue .empty[Any ]).enqueue(request)
71
73
} else {
72
74
worker ! request
73
75
inProgressKeys = inProgressKeys + request.key
74
76
}
75
77
76
78
case request : RegisterInitialData =>
77
- // send WatchEndpoint first as the put operation will be retry until success if failed
79
+ // send WatchEndpoint first as the put operation will be retried until success if failed
78
80
if (request.failoverEnabled)
79
81
watcherService ! WatchEndpoint (request.key, request.value, isPrefix = false , watcherName, Set (DeleteEvent ))
80
82
if (inProgressKeys.contains(request.key)) {
@@ -108,8 +110,9 @@ class DataManagementService(watcherService: ActorRef, workerFactory: ActorRefFac
108
110
109
111
case request : WatcherClosed =>
110
112
if (inProgressKeys.contains(request.key)) {
111
- // the new put|delete operation will erase influences made by older operations like put&delete
112
- // so we can remove these old operations
113
+ // The put|delete operations against the same key will overwrite the previous results.
114
+ // For example, if we put a value, delete it and put a new value again, the final result will be the new value.
115
+ // So we can remove these old operations
113
116
logging.info(this , s " save request $request into a buffer " )
114
117
val queue = operations.getOrElseUpdate(request.key, Queue .empty[Any ]).filter { value =>
115
118
value match {
@@ -126,8 +129,8 @@ class DataManagementService(watcherService: ActorRef, workerFactory: ActorRefFac
126
129
127
130
// It is required to close the watcher first before deleting etcd data
128
131
// It is supposed to receive the WatcherClosed message after the watcher is stopped.
129
- case msg : DeRegisterData =>
130
- watcherService ! UnWatchEndpoint (msg.key, isPrefix = false , watcherName, needFeedback = true )
132
+ case msg : UnregisterData =>
133
+ watcherService ! UnwatchEndpoint (msg.key, isPrefix = false , watcherName, needFeedback = true )
131
134
132
135
case WatchEndpointRemoved (_, key, value, false ) =>
133
136
self ! RegisterInitialData (key, value, failoverEnabled = false ) // the watcher is already setup
@@ -140,7 +143,6 @@ class DataManagementService(watcherService: ActorRef, workerFactory: ActorRefFac
140
143
dataCache.get(msg.key) match {
141
144
case Some (cached) if cached == msg.value =>
142
145
logging.debug(this , s " skip publishing data ${msg.key} because the data is not changed. " )
143
- // do nothing
144
146
145
147
case Some (cached) if cached != msg.value =>
146
148
dataCache.update(msg.key, msg.value)
@@ -155,21 +157,20 @@ class DataManagementService(watcherService: ActorRef, workerFactory: ActorRefFac
155
157
}
156
158
157
159
object DataManagementService {
158
- // Todo: Change to configuration
159
- val retryInterval : FiniteDuration = 1 .second
160
+ val retryInterval : FiniteDuration = loadConfigOrThrow[FiniteDuration ](ConfigKeys .dataManagementServiceRetryInterval)
160
161
161
162
def props (watcherService : ActorRef , workerFactory : ActorRefFactory => ActorRef )(implicit logging : Logging ,
162
163
actorSystem : ActorSystem ): Props = {
163
164
Props (new DataManagementService (watcherService, workerFactory))
164
165
}
165
166
}
166
167
167
- class EtcdWorker (etcdClient : EtcdClient , leaseService : ActorRef )(implicit val ec : ExecutionContext ,
168
+ private [service] class EtcdWorker (etcdClient : EtcdClient , leaseService : ActorRef )(implicit val ec : ExecutionContext ,
168
169
actorSystem : ActorSystem ,
169
170
logging : Logging )
170
171
extends Actor {
171
172
172
- private val parent = context.parent
173
+ private val dataManagementService = context.parent
173
174
private var lease : Option [Lease ] = None
174
175
leaseService ! GetLease
175
176
@@ -186,7 +187,7 @@ class EtcdWorker(etcdClient: EtcdClient, leaseService: ActorRef)(implicit val ec
186
187
.andThen {
187
188
case Success (msg) =>
188
189
request.recipient ! ElectionResult (msg)
189
- parent ! FinishWork (request.key)
190
+ dataManagementService ! FinishWork (request.key)
190
191
}
191
192
.recover {
192
193
// if there is no lease, reissue it and retry immediately
@@ -215,7 +216,7 @@ class EtcdWorker(etcdClient: EtcdClient, leaseService: ActorRef)(implicit val ec
215
216
.put(request.key, request.value, l.id)
216
217
.andThen {
217
218
case Success (_) =>
218
- parent ! FinishWork (request.key)
219
+ dataManagementService ! FinishWork (request.key)
219
220
}
220
221
.recover {
221
222
// if there is no lease, reissue it and retry immediately
@@ -243,12 +244,12 @@ class EtcdWorker(etcdClient: EtcdClient, leaseService: ActorRef)(implicit val ec
243
244
etcdClient
244
245
.putTxn(request.key, request.value, 0 , l.id)
245
246
.map { res =>
246
- parent ! FinishWork (request.key)
247
+ dataManagementService ! FinishWork (request.key)
247
248
if (res.getSucceeded) {
248
- logging.debug (this , s " data is stored. " )
249
+ logging.info (this , s " initial data storing succeeds for ${request.key} " )
249
250
request.recipient.map(_ ! InitialDataStorageResults (request.key, Right (Done ())))
250
251
} else {
251
- logging.debug (this , s " data is already stored for: $request" )
252
+ logging.info (this , s " data is already stored for: $request, cancel the initial data storing " )
252
253
request.recipient.map(_ ! InitialDataStorageResults (request.key, Left (AlreadyExist ())))
253
254
}
254
255
}
@@ -278,7 +279,7 @@ class EtcdWorker(etcdClient: EtcdClient, leaseService: ActorRef)(implicit val ec
278
279
.del(msg.key)
279
280
.andThen {
280
281
case Success (_) =>
281
- parent ! FinishWork (msg.key)
282
+ dataManagementService ! FinishWork (msg.key)
282
283
}
283
284
.recover {
284
285
// if there is no lease, reissue it and retry immediately
@@ -296,11 +297,8 @@ class EtcdWorker(etcdClient: EtcdClient, leaseService: ActorRef)(implicit val ec
296
297
297
298
}
298
299
299
- private def sendMessageToSelfAfter (msg : Any , retryInterval : FiniteDuration ): Future [Unit ] = {
300
- akka.pattern.after(retryInterval, actorSystem.scheduler) {
301
- self ! msg
302
- Future .successful({})
303
- }
300
+ private def sendMessageToSelfAfter (msg : Any , retryInterval : FiniteDuration ) = {
301
+ actorSystem.scheduler.scheduleOnce(retryInterval, self, msg)
304
302
}
305
303
}
306
304
0 commit comments