Skip to content

Commit 5614943

Browse files
Fix race conditions in CurrentValueRelay (#3447)
* Test tweak test tweaks * Slow fix * Fix test compilation * nonrecursive lock * back to os_lock * undo renaming * visibility * Feedback --------- Co-authored-by: Stephen Celis <[email protected]>
1 parent c134e5a commit 5614943

File tree

2 files changed

+49
-11
lines changed

2 files changed

+49
-11
lines changed

Sources/ComposableArchitecture/Internal/CurrentValueRelay.swift

+19-11
Original file line numberDiff line numberDiff line change
@@ -79,23 +79,27 @@ extension CurrentValueRelay {
7979
}
8080

8181
func receive(_ value: Output) {
82-
guard let downstream else { return }
82+
self.lock.lock()
83+
84+
guard let downstream else {
85+
self.lock.unlock()
86+
return
87+
}
8388

8489
switch self.demand {
8590
case .unlimited:
91+
self.lock.unlock()
8692
// NB: Adding to unlimited demand has no effect and can be ignored.
8793
_ = downstream.receive(value)
8894

8995
case .none:
90-
self.lock.sync {
91-
self.receivedLastValue = false
92-
}
96+
self.receivedLastValue = false
97+
self.lock.unlock()
9398

9499
default:
95-
self.lock.sync {
96-
self.receivedLastValue = true
97-
self.demand -= 1
98-
}
100+
self.receivedLastValue = true
101+
self.demand -= 1
102+
self.lock.unlock()
99103
let moreDemand = downstream.receive(value)
100104
self.lock.sync {
101105
self.demand += moreDemand
@@ -106,14 +110,18 @@ extension CurrentValueRelay {
106110
func request(_ demand: Subscribers.Demand) {
107111
precondition(demand > 0, "Demand must be greater than zero")
108112

109-
guard let downstream else { return }
110-
111113
self.lock.lock()
114+
115+
guard let downstream else {
116+
self.lock.unlock()
117+
return
118+
}
119+
112120
self.demand += demand
113121

114122
guard
115123
!self.receivedLastValue,
116-
let value = self.upstream?.currentValue
124+
let value = self.upstream?.value
117125
else {
118126
self.lock.unlock()
119127
return

Tests/ComposableArchitectureTests/CurrentValueRelayTests.swift

+30
Original file line numberDiff line numberDiff line change
@@ -25,5 +25,35 @@
2525

2626
_ = cancellable
2727
}
28+
29+
@available(macOS 12.0, iOS 15.0, tvOS 15.0, watchOS 8.0, *)
30+
func testConcurrentSendAndReceive() async {
31+
nonisolated(unsafe) let subject = CurrentValueRelay(0)
32+
let values = LockIsolated<Set<Int>>([])
33+
let cancellable = subject.sink { (value: Int) in
34+
values.withValue {
35+
_ = $0.insert(value)
36+
}
37+
}
38+
39+
let receives = Task.detached { @Sendable in
40+
for await _ in subject.values {}
41+
}
42+
43+
await withTaskGroup(of: Void.self) { group in
44+
for index in 1...1_000 {
45+
group.addTask { @Sendable in
46+
subject.send(index)
47+
}
48+
}
49+
}
50+
51+
receives.cancel()
52+
_ = await receives.value
53+
54+
XCTAssertEqual(values.value, Set(Array(0...1_000)))
55+
56+
_ = cancellable
57+
}
2858
}
2959
#endif

0 commit comments

Comments
 (0)