Skip to content

Commit 8cfd314

Browse files
authored
Prevent a potential race condition that can cause self assignments to be replaced (#148)
1 parent c0c00bf commit 8cfd314

File tree

1 file changed

+6
-6
lines changed

1 file changed

+6
-6
lines changed

entities/src/main/scala/com/devsisters/shardcake/Sharding.scala

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -140,14 +140,14 @@ class Sharding private (
140140

141141
private def updateAssignments(
142142
assignmentsOpt: Map[ShardId, Option[PodAddress]],
143-
fromShardManager: Boolean
143+
replaceAllAssignments: Boolean
144144
): UIO[Unit] = {
145145
val assignments = assignmentsOpt.flatMap { case (k, v) => v.map(k -> _) }
146146
ZIO.logDebug("Received new shard assignments") *>
147147
Metrics.shards
148148
.set(assignmentsOpt.count { case (_, podOpt) => podOpt.contains(address) })
149-
.when(fromShardManager) *>
150-
(if (fromShardManager) shardAssignments.set(assignments)
149+
.when(replaceAllAssignments) *>
150+
(if (replaceAllAssignments) shardAssignments.set(assignments)
151151
else
152152
shardAssignments.update(map =>
153153
// we keep self assignments (we don't override them with the new assignments
@@ -166,8 +166,8 @@ class Sharding private (
166166
) ++
167167
// then, get assignments changes from Redis
168168
storage.assignmentsStream.map(_ -> false)
169-
_ <- assignmentStream.mapZIO { case (assignmentsOpt, fromShardManager) =>
170-
updateAssignments(assignmentsOpt, fromShardManager) *> latch.succeed(()).when(fromShardManager)
169+
_ <- assignmentStream.mapZIO { case (assignmentsOpt, replaceAllAssignments) =>
170+
updateAssignments(assignmentsOpt, replaceAllAssignments) *> latch.succeed(()).when(replaceAllAssignments)
171171
}.runDrain
172172
.retry(Schedule.fixed(config.refreshAssignmentsRetryInterval))
173173
.interruptible
@@ -254,7 +254,7 @@ class Sharding private (
254254
(shardManager.notifyUnhealthyPod(pod) *>
255255
// just in case we missed the update from the pubsub, refresh assignments
256256
shardManager.getAssignments
257-
.flatMap[Any, Throwable, Unit](updateAssignments(_, fromShardManager = true))).forkDaemon
257+
.flatMap[Any, Throwable, Unit](updateAssignments(_, replaceAllAssignments = false))).forkDaemon
258258
)
259259
}
260260

0 commit comments

Comments
 (0)