Skip to content

Commit 526f011

Browse files
bdoyle0182Brendan Doylerabbah
authored
Reset / Overwrite invokerId for unique name in zookeeper manually (#5024)
* init * add instance id check for overwrite * Update core/invoker/src/main/scala/org/apache/openwhisk/core/invoker/InstanceIdAssigner.scala Co-authored-by: rodric rabbah <[email protected]> * Update core/invoker/src/main/scala/org/apache/openwhisk/core/invoker/InstanceIdAssigner.scala Co-authored-by: rodric rabbah <[email protected]> * assign based on invoker pool size Co-authored-by: Brendan Doyle <[email protected]> Co-authored-by: rodric rabbah <[email protected]>
1 parent 12ca4e3 commit 526f011

File tree

3 files changed

+89
-34
lines changed

3 files changed

+89
-34
lines changed

core/invoker/src/main/scala/org/apache/openwhisk/core/invoker/InstanceIdAssigner.scala

Lines changed: 55 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -22,51 +22,80 @@ import org.apache.curator.framework.recipes.shared.SharedCount
2222
import org.apache.curator.retry.RetryUntilElapsed
2323
import org.apache.openwhisk.common.Logging
2424

25+
import scala.collection.JavaConverters._
26+
2527
/**
2628
* Computes the instanceId for invoker
2729
*
2830
* @param connectionString zooKeeper connection string
2931
*/
3032
private[invoker] class InstanceIdAssigner(connectionString: String)(implicit logger: Logging) {
3133

32-
def getId(name: String): Int = {
34+
def setAndGetId(name: String, overwriteId: Option[Int] = None): Int = {
3335
logger.info(this, s"invokerReg: creating zkClient to $connectionString")
3436
val retryPolicy = new RetryUntilElapsed(5000, 500) // retry at 500ms intervals until 5 seconds have elapsed
3537
val zkClient = CuratorFrameworkFactory.newClient(connectionString, retryPolicy)
3638
zkClient.start()
3739
zkClient.blockUntilConnected()
3840
logger.info(this, "invokerReg: connected to zookeeper")
3941

40-
val myIdPath = "/invokers/idAssignment/mapping/" + name
41-
val assignedId = Option(zkClient.checkExists().forPath(myIdPath)) match {
42-
case None =>
43-
// path doesn't exist -> no previous mapping for this invoker
44-
logger.info(this, s"invokerReg: no prior assignment of id for invoker $name")
45-
val idCounter = new SharedCount(zkClient, "/invokers/idAssignment/counter", 0)
46-
idCounter.start()
42+
val rootPath = "/invokers/idAssignment/mapping"
43+
val myIdPath = s"$rootPath/$name"
44+
val assignedId = overwriteId
45+
.map(newId => {
46+
val invokers = zkClient.getChildren.forPath(rootPath).asScala
4747

48-
def assignId(): Int = {
49-
val current = idCounter.getVersionedValue()
50-
if (idCounter.trySetCount(current, current.getValue() + 1)) {
51-
current.getValue()
52-
} else {
53-
assignId()
54-
}
55-
}
48+
if (invokers.size < newId)
49+
throw new IllegalArgumentException(s"invokerReg: cannot assign $newId to $name: not enough invokers")
50+
51+
//check if the invokerId already exists for another unique name and delete if it does
52+
invokers
53+
.map(uniqueName => {
54+
val idPath = s"$rootPath/$uniqueName"
55+
(idPath, BigInt(zkClient.getData.forPath(idPath)).intValue)
56+
})
57+
.find(_._2 == newId)
58+
.map(id => zkClient.delete().forPath(id._1))
59+
60+
zkClient.create().orSetData().forPath(myIdPath, BigInt(newId).toByteArray)
5661

57-
val newId = assignId()
58-
idCounter.close()
59-
zkClient.create().creatingParentContainersIfNeeded().forPath(myIdPath, BigInt(newId).toByteArray)
6062
logger.info(this, s"invokerReg: invoker $name was assigned invokerId $newId")
6163
newId
64+
})
65+
.getOrElse({
66+
Option(zkClient.checkExists().forPath(myIdPath)) match {
67+
case None =>
68+
// path doesn't exist -> no previous mapping for this invoker
69+
logger.info(this, s"invokerReg: no prior assignment of id for invoker $name")
70+
val idCounter = new SharedCount(zkClient, "/invokers/idAssignment/counter", 0)
71+
idCounter.start()
6272

63-
case Some(_) =>
64-
// path already exists -> there is a previous mapping for this invoker we should use
65-
val rawOldId = zkClient.getData().forPath(myIdPath)
66-
val oldId = BigInt(rawOldId).intValue
67-
logger.info(this, s"invokerReg: invoker $name was assigned its previous invokerId $oldId")
68-
oldId
69-
}
73+
def assignId(): Int = {
74+
val current = idCounter.getVersionedValue()
75+
val numInvokers = Option(zkClient.checkExists().forPath(rootPath))
76+
.map(_ => zkClient.getChildren.forPath(rootPath).size())
77+
.getOrElse(0)
78+
if (idCounter.trySetCount(current, numInvokers + 1)) {
79+
numInvokers
80+
} else {
81+
assignId()
82+
}
83+
}
84+
85+
val newId = assignId()
86+
idCounter.close()
87+
zkClient.create().creatingParentContainersIfNeeded().forPath(myIdPath, BigInt(newId).toByteArray)
88+
logger.info(this, s"invokerReg: invoker $name was assigned invokerId $newId")
89+
newId
90+
91+
case Some(_) =>
92+
// path already exists -> there is a previous mapping for this invoker we should use
93+
val rawOldId = zkClient.getData.forPath(myIdPath)
94+
val oldId = BigInt(rawOldId).intValue
95+
logger.info(this, s"invokerReg: invoker $name was assigned its previous invokerId $oldId")
96+
oldId
97+
}
98+
})
7099

71100
zkClient.close()
72101
assignedId

core/invoker/src/main/scala/org/apache/openwhisk/core/invoker/Invoker.scala

Lines changed: 11 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -40,7 +40,10 @@ import scala.concurrent.duration._
4040
import scala.concurrent.{Await, ExecutionContext, Future}
4141
import scala.util.Try
4242

43-
case class CmdLineArgs(uniqueName: Option[String] = None, id: Option[Int] = None, displayedName: Option[String] = None)
43+
case class CmdLineArgs(uniqueName: Option[String] = None,
44+
id: Option[Int] = None,
45+
displayedName: Option[String] = None,
46+
overwriteId: Option[Int] = None)
4447

4548
object Invoker {
4649

@@ -133,6 +136,8 @@ object Invoker {
133136
// --uniqueName <value> a unique name to dynamically assign Kafka topics from Zookeeper
134137
// --displayedName <value> a name to identify this invoker via invoker health protocol
135138
// --id <value> proposed invokerId
139+
// --overwriteId <value> proposed invokerId to re-write with uniqueName in Zookeeper,
140+
// DO NOT USE overwriteId unless sure invokerId does not exist for other uniqueName
136141
def parse(ls: List[String], c: CmdLineArgs): CmdLineArgs = {
137142
ls match {
138143
case "--uniqueName" :: uniqueName :: tail =>
@@ -141,6 +146,8 @@ object Invoker {
141146
parse(tail, c.copy(displayedName = nonEmptyString(displayedName)))
142147
case "--id" :: id :: tail if Try(id.toInt).isSuccess =>
143148
parse(tail, c.copy(id = Some(id.toInt)))
149+
case "--overwriteId" :: overwriteId :: tail if Try(overwriteId.toInt).isSuccess =>
150+
parse(tail, c.copy(overwriteId = Some(overwriteId.toInt)))
144151
case Nil => c
145152
case _ => abort(s"Error processing command line arguments $ls")
146153
}
@@ -150,16 +157,16 @@ object Invoker {
150157

151158
val assignedInvokerId = cmdLineArgs match {
152159
// --id is defined with a valid value, use this id directly.
153-
case CmdLineArgs(_, Some(id), _) =>
160+
case CmdLineArgs(_, Some(id), _, _) =>
154161
logger.info(this, s"invokerReg: using proposedInvokerId $id")
155162
id
156163

157164
// --uniqueName is defined with a valid value, id is empty, assign an id via zookeeper
158-
case CmdLineArgs(Some(unique), None, _) =>
165+
case CmdLineArgs(Some(unique), None, _, overwriteId) =>
159166
if (config.zookeeperHosts.startsWith(":") || config.zookeeperHosts.endsWith(":")) {
160167
abort(s"Must provide valid zookeeper host and port to use dynamicId assignment (${config.zookeeperHosts})")
161168
}
162-
new InstanceIdAssigner(config.zookeeperHosts).getId(unique)
169+
new InstanceIdAssigner(config.zookeeperHosts).setAndGetId(unique, overwriteId)
163170

164171
case _ => abort(s"Either --id or --uniqueName must be configured with correct values")
165172
}

tests/src/test/scala/org/apache/openwhisk/core/invoker/test/InstanceIdAssignerTests.scala

Lines changed: 23 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -40,14 +40,33 @@ class InstanceIdAssignerTests extends FlatSpec with Matchers with StreamLogging
4040

4141
it should "assign fresh id" in {
4242
val assigner = new InstanceIdAssigner(zkServer.getConnectString)
43-
assigner.getId("foo") shouldBe 0
43+
assigner.setAndGetId("foo") shouldBe 0
4444
}
4545

4646
it should "reuse id if exists" in {
4747
val assigner = new InstanceIdAssigner(zkServer.getConnectString)
48-
assigner.getId("foo") shouldBe 0
49-
assigner.getId("bar") shouldBe 1
50-
assigner.getId("bar") shouldBe 1
48+
assigner.setAndGetId("foo") shouldBe 0
49+
assigner.setAndGetId("bar") shouldBe 1
50+
assigner.setAndGetId("bar") shouldBe 1
5151
}
5252

53+
it should "attempt to overwrite id for unique name if overwrite set" in {
54+
val assigner = new InstanceIdAssigner(zkServer.getConnectString)
55+
assigner.setAndGetId("foo") shouldBe 0
56+
assigner.setAndGetId("bar", Some(0)) shouldBe 0
57+
}
58+
59+
it should "overwrite an id for unique name that already exists and reset overwritten id" in {
60+
val assigner = new InstanceIdAssigner(zkServer.getConnectString)
61+
assigner.setAndGetId("foo") shouldBe 0
62+
assigner.setAndGetId("bar", Some(0)) shouldBe 0
63+
assigner.setAndGetId("foo") shouldBe 1
64+
assigner.setAndGetId("cat") shouldBe 2
65+
}
66+
67+
it should "fail to overwrite an id too large for the invoker pool size" in {
68+
val assigner = new InstanceIdAssigner(zkServer.getConnectString)
69+
assigner.setAndGetId("foo") shouldBe 0
70+
assertThrows[IllegalArgumentException](assigner.setAndGetId("bar", Some(2)))
71+
}
5372
}

0 commit comments

Comments
 (0)