@@ -41,58 +41,61 @@ private[invoker] class InstanceIdAssigner(connectionString: String)(implicit log
41
41
42
42
val rootPath = " /invokers/idAssignment/mapping"
43
43
val myIdPath = s " $rootPath/ $name"
44
- val assignedId = if (overwriteId.isEmpty) {
45
- Option (zkClient.checkExists().forPath(myIdPath)) match {
46
- case None =>
47
- // path doesn't exist -> no previous mapping for this invoker
48
- logger.info(this , s " invokerReg: no prior assignment of id for invoker $name" )
49
- val idCounter = new SharedCount (zkClient, " /invokers/idAssignment/counter" , 0 )
50
- idCounter.start()
44
+ val assignedId = overwriteId
45
+ .map(newId => {
46
+ val invokers = zkClient.getChildren.forPath(rootPath).asScala
51
47
52
- def assignId (): Int = {
53
- val current = idCounter.getVersionedValue()
54
- if (idCounter.trySetCount(current, current.getValue() + 1 )) {
55
- current.getValue()
56
- } else {
57
- assignId()
58
- }
59
- }
48
+ if (invokers.size < newId)
49
+ throw new IllegalArgumentException (s " invokerReg: cannot assign $newId to $name: not enough invokers " )
50
+
51
+ // check if the invokerId already exists for another unique name and delete if it does
52
+ invokers
53
+ .map(uniqueName => {
54
+ val idPath = s " $rootPath/ $uniqueName"
55
+ (idPath, BigInt (zkClient.getData.forPath(idPath)).intValue)
56
+ })
57
+ .find(_._2 == newId)
58
+ .map(id => zkClient.delete().forPath(id._1))
60
59
61
- val newId = assignId()
62
- idCounter.close()
63
- zkClient.create().creatingParentContainersIfNeeded().forPath(myIdPath, BigInt (newId).toByteArray)
64
- logger.info(this , s " invokerReg: invoker $name was assigned invokerId $newId" )
65
- newId
60
+ zkClient.create().orSetData().forPath(myIdPath, BigInt (newId).toByteArray)
66
61
67
- case Some (_) =>
68
- // path already exists -> there is a previous mapping for this invoker we should use
69
- val rawOldId = zkClient.getData().forPath(myIdPath)
70
- val oldId = BigInt (rawOldId).intValue
71
- logger.info(this , s " invokerReg: invoker $name was assigned its previous invokerId $oldId" )
72
- oldId
73
- }
74
- } else {
75
- val newId = overwriteId.get
62
+ logger.info(this , s " invokerReg: invoker $name was assigned invokerId $newId" )
63
+ newId
64
+ })
65
+ .getOrElse({
66
+ Option (zkClient.checkExists().forPath(myIdPath)) match {
67
+ case None =>
68
+ // path doesn't exist -> no previous mapping for this invoker
69
+ logger.info(this , s " invokerReg: no prior assignment of id for invoker $name" )
70
+ val idCounter = new SharedCount (zkClient, " /invokers/idAssignment/counter" , 0 )
71
+ idCounter.start()
76
72
77
- // check if the invokerId already exists for another unique name
78
- val instanceIdExists = zkClient
79
- .getChildren()
80
- .forPath(rootPath)
81
- .asScala
82
- .map(uniqueName => {
83
- val idPath = s " $rootPath/ $uniqueName"
84
- BigInt (zkClient.getData().forPath(idPath)).intValue
85
- })
86
- .find(_ == newId)
73
+ def assignId (): Int = {
74
+ val current = idCounter.getVersionedValue()
75
+ val numInvokers = Option (zkClient.checkExists().forPath(rootPath))
76
+ .map(_ => zkClient.getChildren.forPath(rootPath).size())
77
+ .getOrElse(0 )
78
+ if (idCounter.trySetCount(current, numInvokers + 1 )) {
79
+ numInvokers
80
+ } else {
81
+ assignId()
82
+ }
83
+ }
87
84
88
- if (instanceIdExists.nonEmpty) {
89
- throw new IllegalArgumentException (s " invokerReg: an invoker with id $newId already exists in zookeeper " )
90
- }
85
+ val newId = assignId()
86
+ idCounter.close()
87
+ zkClient.create().creatingParentContainersIfNeeded().forPath(myIdPath, BigInt (newId).toByteArray)
88
+ logger.info(this , s " invokerReg: invoker $name was assigned invokerId $newId" )
89
+ newId
91
90
92
- zkClient.create().orSetData().forPath(myIdPath, BigInt (newId).toByteArray)
93
- logger.info(this , s " invokerReg: invoker $name was assigned invokerId $newId" )
94
- newId
95
- }
91
+ case Some (_) =>
92
+ // path already exists -> there is a previous mapping for this invoker we should use
93
+ val rawOldId = zkClient.getData.forPath(myIdPath)
94
+ val oldId = BigInt (rawOldId).intValue
95
+ logger.info(this , s " invokerReg: invoker $name was assigned its previous invokerId $oldId" )
96
+ oldId
97
+ }
98
+ })
96
99
97
100
zkClient.close()
98
101
assignedId
0 commit comments