Skip to content

Commit 1936fe0

Browse files
author
Brendan Doyle
committed
assign based on invoker pool size
1 parent aee8d9d commit 1936fe0

File tree

2 files changed

+60
-50
lines changed

2 files changed

+60
-50
lines changed

core/invoker/src/main/scala/org/apache/openwhisk/core/invoker/InstanceIdAssigner.scala

Lines changed: 49 additions & 46 deletions
Original file line numberDiff line numberDiff line change
@@ -41,58 +41,61 @@ private[invoker] class InstanceIdAssigner(connectionString: String)(implicit log
4141

4242
val rootPath = "/invokers/idAssignment/mapping"
4343
val myIdPath = s"$rootPath/$name"
44-
val assignedId = if (overwriteId.isEmpty) {
45-
Option(zkClient.checkExists().forPath(myIdPath)) match {
46-
case None =>
47-
// path doesn't exist -> no previous mapping for this invoker
48-
logger.info(this, s"invokerReg: no prior assignment of id for invoker $name")
49-
val idCounter = new SharedCount(zkClient, "/invokers/idAssignment/counter", 0)
50-
idCounter.start()
44+
val assignedId = overwriteId
45+
.map(newId => {
46+
val invokers = zkClient.getChildren.forPath(rootPath).asScala
5147

52-
def assignId(): Int = {
53-
val current = idCounter.getVersionedValue()
54-
if (idCounter.trySetCount(current, current.getValue() + 1)) {
55-
current.getValue()
56-
} else {
57-
assignId()
58-
}
59-
}
48+
if (invokers.size < newId)
49+
throw new IllegalArgumentException(s"invokerReg: cannot assign $newId to $name: not enough invokers")
50+
51+
//check if the invokerId already exists for another unique name and delete if it does
52+
invokers
53+
.map(uniqueName => {
54+
val idPath = s"$rootPath/$uniqueName"
55+
(idPath, BigInt(zkClient.getData.forPath(idPath)).intValue)
56+
})
57+
.find(_._2 == newId)
58+
.map(id => zkClient.delete().forPath(id._1))
6059

61-
val newId = assignId()
62-
idCounter.close()
63-
zkClient.create().creatingParentContainersIfNeeded().forPath(myIdPath, BigInt(newId).toByteArray)
64-
logger.info(this, s"invokerReg: invoker $name was assigned invokerId $newId")
65-
newId
60+
zkClient.create().orSetData().forPath(myIdPath, BigInt(newId).toByteArray)
6661

67-
case Some(_) =>
68-
// path already exists -> there is a previous mapping for this invoker we should use
69-
val rawOldId = zkClient.getData().forPath(myIdPath)
70-
val oldId = BigInt(rawOldId).intValue
71-
logger.info(this, s"invokerReg: invoker $name was assigned its previous invokerId $oldId")
72-
oldId
73-
}
74-
} else {
75-
val newId = overwriteId.get
62+
logger.info(this, s"invokerReg: invoker $name was assigned invokerId $newId")
63+
newId
64+
})
65+
.getOrElse({
66+
Option(zkClient.checkExists().forPath(myIdPath)) match {
67+
case None =>
68+
// path doesn't exist -> no previous mapping for this invoker
69+
logger.info(this, s"invokerReg: no prior assignment of id for invoker $name")
70+
val idCounter = new SharedCount(zkClient, "/invokers/idAssignment/counter", 0)
71+
idCounter.start()
7672

77-
//check if the invokerId already exists for another unique name
78-
val instanceIdExists = zkClient
79-
.getChildren()
80-
.forPath(rootPath)
81-
.asScala
82-
.map(uniqueName => {
83-
val idPath = s"$rootPath/$uniqueName"
84-
BigInt(zkClient.getData().forPath(idPath)).intValue
85-
})
86-
.find(_ == newId)
73+
def assignId(): Int = {
74+
val current = idCounter.getVersionedValue()
75+
val numInvokers = Option(zkClient.checkExists().forPath(rootPath))
76+
.map(_ => zkClient.getChildren.forPath(rootPath).size())
77+
.getOrElse(0)
78+
if (idCounter.trySetCount(current, numInvokers + 1)) {
79+
numInvokers
80+
} else {
81+
assignId()
82+
}
83+
}
8784

88-
if (instanceIdExists.nonEmpty) {
89-
throw new IllegalArgumentException(s"invokerReg: an invoker with id $newId already exists in zookeeper")
90-
}
85+
val newId = assignId()
86+
idCounter.close()
87+
zkClient.create().creatingParentContainersIfNeeded().forPath(myIdPath, BigInt(newId).toByteArray)
88+
logger.info(this, s"invokerReg: invoker $name was assigned invokerId $newId")
89+
newId
9190

92-
zkClient.create().orSetData().forPath(myIdPath, BigInt(newId).toByteArray)
93-
logger.info(this, s"invokerReg: invoker $name was assigned invokerId $newId")
94-
newId
95-
}
91+
case Some(_) =>
92+
// path already exists -> there is a previous mapping for this invoker we should use
93+
val rawOldId = zkClient.getData.forPath(myIdPath)
94+
val oldId = BigInt(rawOldId).intValue
95+
logger.info(this, s"invokerReg: invoker $name was assigned its previous invokerId $oldId")
96+
oldId
97+
}
98+
})
9699

97100
zkClient.close()
98101
assignedId

tests/src/test/scala/org/apache/openwhisk/core/invoker/test/InstanceIdAssignerTests.scala

Lines changed: 11 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -53,13 +53,20 @@ class InstanceIdAssignerTests extends FlatSpec with Matchers with StreamLogging
5353
it should "attempt to overwrite id for unique name if overwrite set" in {
5454
val assigner = new InstanceIdAssigner(zkServer.getConnectString)
5555
assigner.setAndGetId("foo") shouldBe 0
56-
assigner.setAndGetId("foo", Some(2)) shouldBe 2
56+
assigner.setAndGetId("bar", Some(0)) shouldBe 0
5757
}
5858

59-
it should "fail to overwrite an id for unique name that already exists" in {
59+
it should "overwrite an id for unique name that already exists and reset overwritten id" in {
6060
val assigner = new InstanceIdAssigner(zkServer.getConnectString)
6161
assigner.setAndGetId("foo") shouldBe 0
62-
assigner.setAndGetId("bar") shouldBe 1
63-
assertThrows[IllegalArgumentException](assigner.setAndGetId("bar", Some(1)))
62+
assigner.setAndGetId("bar", Some(0)) shouldBe 0
63+
assigner.setAndGetId("foo") shouldBe 1
64+
assigner.setAndGetId("cat") shouldBe 2
65+
}
66+
67+
it should "fail to overwrite an id too large for the invoker pool size" in {
68+
val assigner = new InstanceIdAssigner(zkServer.getConnectString)
69+
assigner.setAndGetId("foo") shouldBe 0
70+
assertThrows[IllegalArgumentException](assigner.setAndGetId("bar", Some(2)))
6471
}
6572
}

0 commit comments

Comments
 (0)