@@ -41,7 +41,26 @@ private[invoker] class InstanceIdAssigner(connectionString: String)(implicit log
41
41
42
42
val rootPath = " /invokers/idAssignment/mapping"
43
43
val myIdPath = s " $rootPath/ $name"
44
- val assignedId = if (overwriteId.isEmpty) {
44
+ val assignedId = overwriteId.map(newId => {
45
+ val invokers = zkClient.getChildren.forPath(rootPath).asScala
46
+
47
+ if (invokers.size < newId)
48
+ throw new IllegalArgumentException (s " invokerReg: cannot assign $newId to $name: not enough invokers " )
49
+
50
+ // check if the invokerId already exists for another unique name and delete if it does
51
+ invokers
52
+ .map(uniqueName => {
53
+ val idPath = s " $rootPath/ $uniqueName"
54
+ (idPath, BigInt (zkClient.getData.forPath(idPath)).intValue)
55
+ })
56
+ .find(_._2 == newId)
57
+ .map(id => zkClient.delete().forPath(id._1))
58
+
59
+ zkClient.create().orSetData().forPath(myIdPath, BigInt (newId).toByteArray)
60
+
61
+ logger.info(this , s " invokerReg: invoker $name was assigned invokerId $newId" )
62
+ newId
63
+ }).getOrElse({
45
64
Option (zkClient.checkExists().forPath(myIdPath)) match {
46
65
case None =>
47
66
// path doesn't exist -> no previous mapping for this invoker
@@ -51,8 +70,11 @@ private[invoker] class InstanceIdAssigner(connectionString: String)(implicit log
51
70
52
71
def assignId (): Int = {
53
72
val current = idCounter.getVersionedValue()
54
- if (idCounter.trySetCount(current, current.getValue() + 1 )) {
55
- current.getValue()
73
+ val numInvokers = Option (zkClient.checkExists().forPath(rootPath))
74
+ .map(_ => zkClient.getChildren.forPath(rootPath).size())
75
+ .getOrElse(0 )
76
+ if (idCounter.trySetCount(current, numInvokers + 1 )) {
77
+ numInvokers
56
78
} else {
57
79
assignId()
58
80
}
@@ -66,33 +88,12 @@ private[invoker] class InstanceIdAssigner(connectionString: String)(implicit log
66
88
67
89
case Some (_) =>
68
90
// path already exists -> there is a previous mapping for this invoker we should use
69
- val rawOldId = zkClient.getData() .forPath(myIdPath)
91
+ val rawOldId = zkClient.getData.forPath(myIdPath)
70
92
val oldId = BigInt (rawOldId).intValue
71
93
logger.info(this , s " invokerReg: invoker $name was assigned its previous invokerId $oldId" )
72
94
oldId
73
95
}
74
- } else {
75
- val newId = overwriteId.get
76
-
77
- // check if the invokerId already exists for another unique name
78
- val instanceIdExists = zkClient
79
- .getChildren()
80
- .forPath(rootPath)
81
- .asScala
82
- .map(uniqueName => {
83
- val idPath = s " $rootPath/ $uniqueName"
84
- BigInt (zkClient.getData().forPath(idPath)).intValue
85
- })
86
- .find(_ == newId)
87
-
88
- if (instanceIdExists.nonEmpty) {
89
- throw new IllegalArgumentException (s " invokerReg: an invoker with id $newId already exists in zookeeper " )
90
- }
91
-
92
- zkClient.create().orSetData().forPath(myIdPath, BigInt (newId).toByteArray)
93
- logger.info(this , s " invokerReg: invoker $name was assigned invokerId $newId" )
94
- newId
95
- }
96
+ })
96
97
97
98
zkClient.close()
98
99
assignedId
0 commit comments