Skip to content

Commit dd4e051

Browse files
committed
Include workers in state data to thread safe
1 parent 9527740 commit dd4e051

File tree

2 files changed

+46
-24
lines changed

2 files changed

+46
-24
lines changed

common/scala/src/main/scala/org/apache/openwhisk/core/service/LeaseKeepAliveService.scala

Lines changed: 21 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,7 @@ case object Active extends KeepAliveServiceState
4040
sealed trait KeepAliveServiceData
4141
case object NoData extends KeepAliveServiceData
4242
case class Lease(id: Long, ttl: Long) extends KeepAliveServiceData
43+
case class ActiveStates(worker: Cancellable, lease: Lease) extends KeepAliveServiceData
4344

4445
// Events received by the actor
4546
case object RegrantLease
@@ -48,7 +49,7 @@ case object GrantLease
4849

4950
// Events internally used
5051
case class SetLease(lease: Lease)
51-
case object SetWatcher
52+
case class SetWatcher(worker: Cancellable)
5253

5354
class LeaseKeepAliveService(etcdClient: EtcdClient, instanceId: InstanceId, watcherService: ActorRef)(
5455
implicit logging: Logging,
@@ -59,15 +60,14 @@ class LeaseKeepAliveService(etcdClient: EtcdClient, instanceId: InstanceId, watc
5960
implicit val ec: ExecutionContextExecutor = context.dispatcher
6061

6162
private val leaseTimeout = loadConfigOrThrow[Int](ConfigKeys.etcdLeaseTimeout).seconds
62-
private var worker: Option[Cancellable] = None
6363
private val key = instanceLease(instanceId)
6464
private val watcherName = "lease-service"
6565

6666
self ! GrantLease
6767
startWith(Ready, NoData)
6868

6969
when(Ready) {
70-
case Event(GrantLease, _) =>
70+
case Event(GrantLease, NoData) =>
7171
etcdClient
7272
.grant(leaseTimeout.toSeconds)
7373
.map { res =>
@@ -76,14 +76,14 @@ class LeaseKeepAliveService(etcdClient: EtcdClient, instanceId: InstanceId, watc
7676
.pipeTo(self)
7777
stay
7878

79-
case Event(SetLease(lease), _) =>
79+
case Event(SetLease(lease), NoData) =>
8080
startKeepAliveService(lease)
8181
.pipeTo(self)
8282
logging.info(this, s"Granted a new lease $lease")
8383
stay using lease
8484

85-
case Event(SetWatcher, lease: Lease) =>
86-
goto(Active) using lease
85+
case Event(SetWatcher(w), l: Lease) =>
86+
goto(Active) using ActiveStates(w, l)
8787

8888
case Event(t: FailureMessage, _) =>
8989
logging.warn(this, s"Failed to grant new lease caused by: $t")
@@ -94,15 +94,15 @@ class LeaseKeepAliveService(etcdClient: EtcdClient, instanceId: InstanceId, watc
9494
}
9595

9696
when(Active) {
97-
case Event(WatchEndpointRemoved(`key`, `key`, _, false), lease: Lease) =>
97+
case Event(WatchEndpointRemoved(`key`, `key`, _, false), ActiveStates(worker, lease)) =>
9898
logging.info(this, s"endpoint ie removed so recreate a lease")
99-
recreateLease(lease)
99+
recreateLease(worker, lease)
100100

101-
case Event(RegrantLease, lease: Lease) =>
101+
case Event(RegrantLease, ActiveStates(worker, lease)) =>
102102
logging.info(this, s"ReGrant a lease, old lease:${lease}")
103-
recreateLease(lease)
103+
recreateLease(worker, lease)
104104

105-
case Event(GetLease, lease: Lease) =>
105+
case Event(GetLease, ActiveStates(_, lease)) =>
106106
logging.info(this, s"send the lease(${lease}) to ${sender()}")
107107
sender() ! lease
108108
stay()
@@ -112,18 +112,17 @@ class LeaseKeepAliveService(etcdClient: EtcdClient, instanceId: InstanceId, watc
112112

113113
initialize()
114114

115-
private def startKeepAliveService(lease: Lease): Future[SetWatcher.type] = {
116-
worker = Some {
115+
private def startKeepAliveService(lease: Lease): Future[SetWatcher] = {
116+
val worker =
117117
actorSystem.scheduler.schedule(initialDelay = 0.second, interval = 500.milliseconds)(keepAliveOnce(lease))
118-
}
119118

120119
/**
121120
* To verify that lease has been deleted since timeout,
122121
* create a key using lease, watch the key, and receive an event for deletion.
123122
*/
124123
etcdClient.put(key, s"${lease.id}", lease.id).map { _ =>
125124
watcherService ! WatchEndpoint(key, s"${lease.id}", false, watcherName, Set(DeleteEvent))
126-
SetWatcher
125+
SetWatcher(worker)
127126
}
128127
}
129128

@@ -139,14 +138,14 @@ class LeaseKeepAliveService(etcdClient: EtcdClient, instanceId: InstanceId, watc
139138
}
140139
}
141140

142-
private def recreateLease(lease: Lease) = {
141+
private def recreateLease(worker: Cancellable, lease: Lease) = {
143142
logging.info(this, s"recreate a lease, old lease: $lease")
144-
worker.foreach(_.cancel()) // stop scheduler
143+
worker.cancel() // stop scheduler
145144
watcherService ! UnwatchEndpoint(key, false, watcherName) // stop watcher
146145
etcdClient
147146
.revoke(lease.id) // delete lease
148147
.onComplete(_ => self ! GrantLease) // create lease
149-
goto(Ready)
148+
goto(Ready) using NoData
150149
}
151150

152151
// Unstash all messages stashed while in intermediate state
@@ -162,7 +161,10 @@ class LeaseKeepAliveService(etcdClient: EtcdClient, instanceId: InstanceId, watc
162161
}
163162

164163
override def postStop(): Unit = {
165-
worker.foreach(_.cancel()) // stop scheduler if that exist
164+
stateData match {
165+
case ActiveStates(w, _) => w.cancel() // stop scheduler if that exist
166+
case _ => // do nothing
167+
}
166168
watcherService ! UnwatchEndpoint(key, false, watcherName)
167169
}
168170
}

tests/src/test/scala/org/apache/openwhisk/core/service/LeaseKeepAliveServiceTests.scala

Lines changed: 25 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -101,7 +101,11 @@ class LeaseKeepAliveServiceTests
101101

102102
Thread.sleep(1000)
103103
service.stateName shouldBe Active
104-
service.stateData shouldBe testLease
104+
service.stateData shouldBe a[ActiveStates]
105+
service.stateData match {
106+
case ActiveStates(_, lease) => lease shouldBe testLease
107+
case _ => fail()
108+
}
105109
watcher.expectMsg(WatchEndpoint(testKey, testLease.id.toString, false, watcherName, Set(DeleteEvent)))
106110

107111
}
@@ -134,7 +138,11 @@ class LeaseKeepAliveServiceTests
134138
val service = TestFSMRef(new LeaseKeepAliveService(mockEtcd, testInstanceId, watcher.ref))
135139

136140
service.stateName shouldBe Active
137-
service.stateData shouldBe testLease
141+
service.stateData shouldBe a[ActiveStates]
142+
service.stateData match {
143+
case ActiveStates(_, lease) => lease shouldBe testLease
144+
case _ => fail()
145+
}
138146
watcher.expectMsg(WatchEndpoint(testKey, testLease.id.toString, false, watcherName, Set(DeleteEvent)))
139147

140148
service ! WatchEndpointRemoved(testKey, testKey, testLease.id.toString, false)
@@ -143,7 +151,11 @@ class LeaseKeepAliveServiceTests
143151
Thread.sleep(500) //wait for the lease to be granted
144152

145153
service.stateName shouldBe Active
146-
service.stateData shouldBe newTestLease
154+
service.stateData shouldBe a[ActiveStates]
155+
service.stateData match {
156+
case ActiveStates(_, lease) => lease shouldBe newTestLease
157+
case _ => fail()
158+
}
147159
watcher.expectMsg(WatchEndpoint(newTestKey, newTestLease.id.toString, false, watcherName, Set(DeleteEvent)))
148160
}
149161

@@ -198,14 +210,22 @@ class LeaseKeepAliveServiceTests
198210
val watcher = TestProbe()
199211
val service = TestFSMRef(new LeaseKeepAliveService(mockEtcd, testInstanceId, watcher.ref))
200212
service.stateName shouldBe Active
201-
service.stateData shouldBe testLease
213+
service.stateData shouldBe a[ActiveStates]
214+
service.stateData match {
215+
case ActiveStates(_, lease) => lease shouldBe testLease
216+
case _ => fail()
217+
}
202218
watcher.expectMsg(WatchEndpoint(testKey, testLease.id.toString, false, watcherName, Set(DeleteEvent)))
203219

204220
watcher.expectMsg(UnwatchEndpoint(testKey, false, watcherName))
205221
Thread.sleep(1500) //wait for the lease to be granted
206222

207223
service.stateName shouldBe Active
208-
service.stateData shouldBe newTestLease
224+
service.stateData shouldBe a[ActiveStates]
225+
service.stateData match {
226+
case ActiveStates(_, lease) => lease shouldBe newTestLease
227+
case _ => fail()
228+
}
209229
watcher.expectMsg(WatchEndpoint(newTestKey, newTestLease.id.toString, false, watcherName, Set(DeleteEvent)))
210230
}
211231

0 commit comments

Comments
 (0)