Skip to content

Reset / Overwrite invokerId for unique name in zookeeper manually #5024

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 5 commits into from
Nov 13, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -22,51 +22,80 @@ import org.apache.curator.framework.recipes.shared.SharedCount
import org.apache.curator.retry.RetryUntilElapsed
import org.apache.openwhisk.common.Logging

import scala.collection.JavaConverters._

/**
* Computes the instanceId for invoker
*
* @param connectionString zooKeeper connection string
*/
private[invoker] class InstanceIdAssigner(connectionString: String)(implicit logger: Logging) {

def getId(name: String): Int = {
def setAndGetId(name: String, overwriteId: Option[Int] = None): Int = {
logger.info(this, s"invokerReg: creating zkClient to $connectionString")
val retryPolicy = new RetryUntilElapsed(5000, 500) // retry at 500ms intervals until 5 seconds have elapsed
val zkClient = CuratorFrameworkFactory.newClient(connectionString, retryPolicy)
zkClient.start()
zkClient.blockUntilConnected()
logger.info(this, "invokerReg: connected to zookeeper")

val myIdPath = "/invokers/idAssignment/mapping/" + name
val assignedId = Option(zkClient.checkExists().forPath(myIdPath)) match {
case None =>
// path doesn't exist -> no previous mapping for this invoker
logger.info(this, s"invokerReg: no prior assignment of id for invoker $name")
val idCounter = new SharedCount(zkClient, "/invokers/idAssignment/counter", 0)
idCounter.start()
val rootPath = "/invokers/idAssignment/mapping"
val myIdPath = s"$rootPath/$name"
val assignedId = overwriteId
.map(newId => {
val invokers = zkClient.getChildren.forPath(rootPath).asScala

def assignId(): Int = {
val current = idCounter.getVersionedValue()
if (idCounter.trySetCount(current, current.getValue() + 1)) {
current.getValue()
} else {
assignId()
}
}
if (invokers.size < newId)
throw new IllegalArgumentException(s"invokerReg: cannot assign $newId to $name: not enough invokers")

//check if the invokerId already exists for another unique name and delete if it does
invokers
.map(uniqueName => {
val idPath = s"$rootPath/$uniqueName"
(idPath, BigInt(zkClient.getData.forPath(idPath)).intValue)
})
.find(_._2 == newId)
.map(id => zkClient.delete().forPath(id._1))

zkClient.create().orSetData().forPath(myIdPath, BigInt(newId).toByteArray)

val newId = assignId()
idCounter.close()
zkClient.create().creatingParentContainersIfNeeded().forPath(myIdPath, BigInt(newId).toByteArray)
logger.info(this, s"invokerReg: invoker $name was assigned invokerId $newId")
newId
})
.getOrElse({
Option(zkClient.checkExists().forPath(myIdPath)) match {
case None =>
// path doesn't exist -> no previous mapping for this invoker
logger.info(this, s"invokerReg: no prior assignment of id for invoker $name")
val idCounter = new SharedCount(zkClient, "/invokers/idAssignment/counter", 0)
idCounter.start()

case Some(_) =>
// path already exists -> there is a previous mapping for this invoker we should use
val rawOldId = zkClient.getData().forPath(myIdPath)
val oldId = BigInt(rawOldId).intValue
logger.info(this, s"invokerReg: invoker $name was assigned its previous invokerId $oldId")
oldId
}
def assignId(): Int = {
val current = idCounter.getVersionedValue()
val numInvokers = Option(zkClient.checkExists().forPath(rootPath))
.map(_ => zkClient.getChildren.forPath(rootPath).size())
.getOrElse(0)
if (idCounter.trySetCount(current, numInvokers + 1)) {
numInvokers
} else {
assignId()
}
}

val newId = assignId()
idCounter.close()
zkClient.create().creatingParentContainersIfNeeded().forPath(myIdPath, BigInt(newId).toByteArray)
logger.info(this, s"invokerReg: invoker $name was assigned invokerId $newId")
newId

case Some(_) =>
// path already exists -> there is a previous mapping for this invoker we should use
val rawOldId = zkClient.getData.forPath(myIdPath)
val oldId = BigInt(rawOldId).intValue
logger.info(this, s"invokerReg: invoker $name was assigned its previous invokerId $oldId")
oldId
}
})

zkClient.close()
assignedId
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,10 @@ import scala.concurrent.duration._
import scala.concurrent.{Await, ExecutionContext, Future}
import scala.util.Try

case class CmdLineArgs(uniqueName: Option[String] = None, id: Option[Int] = None, displayedName: Option[String] = None)
case class CmdLineArgs(uniqueName: Option[String] = None,
id: Option[Int] = None,
displayedName: Option[String] = None,
overwriteId: Option[Int] = None)

object Invoker {

Expand Down Expand Up @@ -133,6 +136,8 @@ object Invoker {
// --uniqueName <value> a unique name to dynamically assign Kafka topics from Zookeeper
// --displayedName <value> a name to identify this invoker via invoker health protocol
// --id <value> proposed invokerId
// --overwriteId <value> proposed invokerId to re-write with uniqueName in Zookeeper,
// DO NOT USE overwriteId unless sure invokerId does not exist for other uniqueName
def parse(ls: List[String], c: CmdLineArgs): CmdLineArgs = {
ls match {
case "--uniqueName" :: uniqueName :: tail =>
Expand All @@ -141,6 +146,8 @@ object Invoker {
parse(tail, c.copy(displayedName = nonEmptyString(displayedName)))
case "--id" :: id :: tail if Try(id.toInt).isSuccess =>
parse(tail, c.copy(id = Some(id.toInt)))
case "--overwriteId" :: overwriteId :: tail if Try(overwriteId.toInt).isSuccess =>
parse(tail, c.copy(overwriteId = Some(overwriteId.toInt)))
case Nil => c
case _ => abort(s"Error processing command line arguments $ls")
}
Expand All @@ -150,16 +157,16 @@ object Invoker {

val assignedInvokerId = cmdLineArgs match {
// --id is defined with a valid value, use this id directly.
case CmdLineArgs(_, Some(id), _) =>
case CmdLineArgs(_, Some(id), _, _) =>
logger.info(this, s"invokerReg: using proposedInvokerId $id")
id

// --uniqueName is defined with a valid value, id is empty, assign an id via zookeeper
case CmdLineArgs(Some(unique), None, _) =>
case CmdLineArgs(Some(unique), None, _, overwriteId) =>
if (config.zookeeperHosts.startsWith(":") || config.zookeeperHosts.endsWith(":")) {
abort(s"Must provide valid zookeeper host and port to use dynamicId assignment (${config.zookeeperHosts})")
}
new InstanceIdAssigner(config.zookeeperHosts).getId(unique)
new InstanceIdAssigner(config.zookeeperHosts).setAndGetId(unique, overwriteId)

case _ => abort(s"Either --id or --uniqueName must be configured with correct values")
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,14 +40,33 @@ class InstanceIdAssignerTests extends FlatSpec with Matchers with StreamLogging

it should "assign fresh id" in {
val assigner = new InstanceIdAssigner(zkServer.getConnectString)
assigner.getId("foo") shouldBe 0
assigner.setAndGetId("foo") shouldBe 0
}

it should "reuse id if exists" in {
val assigner = new InstanceIdAssigner(zkServer.getConnectString)
assigner.getId("foo") shouldBe 0
assigner.getId("bar") shouldBe 1
assigner.getId("bar") shouldBe 1
assigner.setAndGetId("foo") shouldBe 0
assigner.setAndGetId("bar") shouldBe 1
assigner.setAndGetId("bar") shouldBe 1
}

it should "attempt to overwrite id for unique name if overwrite set" in {
val assigner = new InstanceIdAssigner(zkServer.getConnectString)
assigner.setAndGetId("foo") shouldBe 0
assigner.setAndGetId("bar", Some(0)) shouldBe 0
}

it should "overwrite an id for unique name that already exists and reset overwritten id" in {
val assigner = new InstanceIdAssigner(zkServer.getConnectString)
assigner.setAndGetId("foo") shouldBe 0
assigner.setAndGetId("bar", Some(0)) shouldBe 0
assigner.setAndGetId("foo") shouldBe 1
assigner.setAndGetId("cat") shouldBe 2
}

it should "fail to overwrite an id too large for the invoker pool size" in {
val assigner = new InstanceIdAssigner(zkServer.getConnectString)
assigner.setAndGetId("foo") shouldBe 0
assertThrows[IllegalArgumentException](assigner.setAndGetId("bar", Some(2)))
}
}