Skip to content

Commit 0addd96

Browse files
Fix a leak in CurrentValueRelay. (#137)
* Bug Fix: Fix a leak in CurrentValueRelay. * - Fix a crash in CombineExt caused by the fix for a leak in CurrentValueRelay. * Minor documentation updates. * Update Sources/Common/Sink.swift Co-authored-by: Shai Mishali <[email protected]>
1 parent ca170dd commit 0addd96

File tree

5 files changed

+241
-15
lines changed

5 files changed

+241
-15
lines changed

.DS_Store

0 Bytes
Binary file not shown.
Original file line numberDiff line numberDiff line change
@@ -1,16 +1,23 @@
11
{
2-
"object": {
3-
"pins": [
4-
{
5-
"package": "combine-schedulers",
6-
"repositoryURL": "https://github.com/pointfreeco/combine-schedulers",
7-
"state": {
8-
"branch": null,
9-
"revision": "afc84b6a3639198b7b8b6d79f04eb3c2ee590d29",
10-
"version": "0.1.1"
11-
}
2+
"pins" : [
3+
{
4+
"identity" : "combine-schedulers",
5+
"kind" : "remoteSourceControl",
6+
"location" : "https://github.com/pointfreeco/combine-schedulers",
7+
"state" : {
8+
"revision" : "4cf088c29a20f52be0f2ca54992b492c54e0076b",
9+
"version" : "0.5.3"
1210
}
13-
]
14-
},
15-
"version": 1
11+
},
12+
{
13+
"identity" : "xctest-dynamic-overlay",
14+
"kind" : "remoteSourceControl",
15+
"location" : "https://github.com/pointfreeco/xctest-dynamic-overlay",
16+
"state" : {
17+
"revision" : "ef8e14e7ce1c0c304c644c6ba365d06c468ded6b",
18+
"version" : "0.3.3"
19+
}
20+
}
21+
],
22+
"version" : 2
1623
}

Sources/Common/Sink.swift

+15-1
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@ class Sink<Upstream: Publisher, Downstream: Subscriber>: Subscriber {
2121
private var upstreamSubscription: Subscription?
2222
private let transformOutput: TransformOutput?
2323
private let transformFailure: TransformFailure?
24+
private var upstreamIsCancelled = false
2425

2526
/// Initialize a new sink subscribing to the upstream publisher and
2627
/// fulfilling the demand of the downstream subscriber using a backpresurre
@@ -41,7 +42,18 @@ class Sink<Upstream: Publisher, Downstream: Subscriber>: Subscriber {
4142
self.buffer = DemandBuffer(subscriber: downstream)
4243
self.transformOutput = transformOutput
4344
self.transformFailure = transformFailure
44-
upstream.subscribe(self)
45+
46+
// A subscription can only be cancelled once. The `upstreamIsCancelled` value
47+
// is used to suppress a second call to cancel when the Sink is deallocated,
48+
// when a sink receives completion, and when a custom operator like `withLatestFrom`
49+
// calls `cancelUpstream()` manually.
50+
upstream
51+
.handleEvents(
52+
receiveCancel: { [weak self] in
53+
self?.upstreamIsCancelled = true
54+
}
55+
)
56+
.subscribe(self)
4557
}
4658

4759
func demand(_ demand: Subscribers.Demand) {
@@ -93,6 +105,8 @@ class Sink<Upstream: Publisher, Downstream: Subscriber>: Subscriber {
93105
}
94106

95107
func cancelUpstream() {
108+
guard upstreamIsCancelled == false else { return }
109+
96110
upstreamSubscription.kill()
97111
}
98112

Sources/Relays/CurrentValueRelay.swift

+2-1
Original file line numberDiff line numberDiff line change
@@ -75,14 +75,15 @@ private extension CurrentValueRelay {
7575
func forceFinish() {
7676
self.sink?.shouldForwardCompletion = true
7777
self.sink?.receive(completion: .finished)
78+
self.sink = nil
7879
}
7980

8081
func request(_ demand: Subscribers.Demand) {
8182
sink?.demand(demand)
8283
}
8384

8485
func cancel() {
85-
sink = nil
86+
forceFinish()
8687
}
8788
}
8889
}

Tests/CurrentValueRelayTests.swift

+204
Original file line numberDiff line numberDiff line change
@@ -120,5 +120,209 @@ class CurrentValueRelayTests: XCTestCase {
120120
XCTAssertFalse(completed)
121121
XCTAssertEqual(values, ["initial", "1", "2", "3"])
122122
}
123+
124+
// There was a race condition which caused the value of a relay
125+
// to leak. Details of the race condition are in this PR:
126+
//
127+
// https://github.com/CombineCommunity/CombineExt/pull/137
128+
//
129+
// The easiest way to reproduce the race condition is
130+
// to initialize `cancellables` before `relay`.
131+
// The first two tests confirm the value of the relay is
132+
// released regardless of when cancellables is initialized.
133+
//
134+
// The last two tests check the scenario where a relay is
135+
// chained with a withLatestFrom operator. This leads
136+
// to two objects being leaked if cancellables is initialized
137+
// before the relays.
138+
final class StoredObject {
139+
static var storedObjectReleased = false
140+
141+
let value = 10
142+
143+
init() {
144+
Self.storedObjectReleased = false
145+
}
146+
147+
deinit {
148+
Self.storedObjectReleased = true
149+
}
150+
}
151+
152+
final class StoredObject2 {
153+
static var storedObjectReleased = false
154+
155+
let value = 20
156+
157+
init() {
158+
Self.storedObjectReleased = false
159+
}
160+
161+
deinit {
162+
Self.storedObjectReleased = true
163+
}
164+
}
165+
166+
func testStoredObjectIsDeallocatedWhenRelayIsDeallocatedAndDeclaredAfterCancellables() {
167+
final class ContainerClass {
168+
static var receivedCompletion = false
169+
static var receivedCancel = false
170+
171+
// Cancellables comes before the relay.
172+
var cancellables = Set<AnyCancellable>()
173+
let relay = CurrentValueRelay(StoredObject())
174+
175+
init() {
176+
relay
177+
.handleEvents(receiveCancel: {
178+
Self.receivedCancel = true
179+
})
180+
.sink(
181+
receiveCompletion: { _ in
182+
Self.receivedCompletion = true
183+
},
184+
receiveValue: { _ in }
185+
)
186+
.store(in: &cancellables)
187+
}
188+
}
189+
190+
var container: ContainerClass? = ContainerClass()
191+
192+
XCTAssertFalse(ContainerClass.receivedCompletion)
193+
XCTAssertFalse(StoredObject.storedObjectReleased)
194+
container = nil
195+
XCTAssertTrue(StoredObject.storedObjectReleased)
196+
XCTAssertNil(container)
197+
198+
// In this case the cancellables is deallocated before the relay.
199+
// The deinit method of AnyCancellable calls cancel for all subscriptions.
200+
// Completion will never be called for a canceled subscription.
201+
XCTAssertFalse(ContainerClass.receivedCompletion)
202+
XCTAssertTrue(ContainerClass.receivedCancel)
203+
}
204+
205+
func testStoredObjectIsDeallocatedWhenRelayIsDeallocatedAndDeclaredBeforeCancellables() {
206+
final class ContainerClass {
207+
static var receivedCompletion = false
208+
static var receivedCancel = false
209+
210+
// Cancellables comes after the relay.
211+
let relay = CurrentValueRelay(StoredObject())
212+
var cancellables = Set<AnyCancellable>()
213+
214+
init() {
215+
relay
216+
.handleEvents(receiveCancel: {
217+
Self.receivedCancel = true
218+
})
219+
.sink(
220+
receiveCompletion: { _ in
221+
Self.receivedCompletion = true
222+
},
223+
receiveValue: { _ in }
224+
)
225+
.store(in: &cancellables)
226+
}
227+
}
228+
229+
var container: ContainerClass? = ContainerClass()
230+
231+
XCTAssertFalse(ContainerClass.receivedCompletion)
232+
XCTAssertFalse(StoredObject.storedObjectReleased)
233+
container = nil
234+
XCTAssertTrue(StoredObject.storedObjectReleased)
235+
XCTAssertNil(container)
236+
237+
// In this case the cancellables is deinited after the CurrentValueRelay,
238+
// so completion will be called. Since the relay was completed, cancel will
239+
// not be called.
240+
XCTAssertTrue(ContainerClass.receivedCompletion)
241+
XCTAssertFalse(ContainerClass.receivedCancel)
242+
}
243+
244+
func testBothStoredObjectsAreDeallocatedWhenRelayAndWithLatestFromOperatorAreDeallocatedAndDeclaredBeforeCancellables() {
245+
final class ContainerClass {
246+
static var receivedCompletion = false
247+
static var receivedCancel = false
248+
249+
// Cancellables comes after the relay. In this case, there
250+
// is no leak.
251+
let relay = CurrentValueRelay(StoredObject())
252+
let relay2 = CurrentValueRelay(StoredObject2())
253+
var cancellables: Set<AnyCancellable>? = Set<AnyCancellable>()
254+
255+
init() {
256+
relay
257+
.withLatestFrom(relay2)
258+
.handleEvents(receiveCancel: {
259+
Self.receivedCancel = true
260+
})
261+
.sink(
262+
receiveCompletion: { _ in
263+
Self.receivedCompletion = true
264+
},
265+
receiveValue: { _ in }
266+
)
267+
.store(in: &cancellables!)
268+
}
269+
}
270+
271+
var container: ContainerClass? = ContainerClass()
272+
273+
XCTAssertFalse(ContainerClass.receivedCompletion)
274+
XCTAssertFalse(StoredObject.storedObjectReleased)
275+
XCTAssertFalse(StoredObject2.storedObjectReleased)
276+
// When the leak was fixed, the stream started crashing because cancel
277+
// was called twice on relay. A fix for the crash was added,
278+
// so setting the container to nil which deallocates cancellables
279+
// confirms there is no crash.
280+
container = nil
281+
XCTAssertTrue(StoredObject.storedObjectReleased)
282+
XCTAssertTrue(StoredObject2.storedObjectReleased)
283+
XCTAssertNil(container)
284+
}
285+
286+
func testBothStoredObjectsAreDeallocatedWhenRelayAndWithLatestFromOperatorAreDeallocatedAndDeclaredAfterCancellables() {
287+
final class ContainerClass {
288+
static var receivedCompletion = false
289+
static var receivedCancel = false
290+
291+
// Cancellables comes before the relay. In this case, the objects
292+
// for both relays leak.
293+
var cancellables: Set<AnyCancellable>? = Set<AnyCancellable>()
294+
let relay = CurrentValueRelay(StoredObject())
295+
let relay2 = CurrentValueRelay(StoredObject2())
296+
297+
init() {
298+
relay
299+
.withLatestFrom(relay2)
300+
.handleEvents(receiveCancel: {
301+
Self.receivedCancel = true
302+
})
303+
.sink(
304+
receiveCompletion: { _ in
305+
Self.receivedCompletion = true
306+
},
307+
receiveValue: { _ in }
308+
)
309+
.store(in: &cancellables!)
310+
}
311+
}
312+
313+
var container: ContainerClass? = ContainerClass()
314+
315+
XCTAssertFalse(ContainerClass.receivedCompletion)
316+
XCTAssertFalse(StoredObject.storedObjectReleased)
317+
XCTAssertFalse(StoredObject2.storedObjectReleased)
318+
// When the leak was fixed, the stream started crashing because cancel
319+
// was called twice on relay. A fix for the crash was added,
320+
// so setting the container to nil which deallocates cancellables
321+
// confirms there is no crash.
322+
container = nil
323+
XCTAssertTrue(StoredObject.storedObjectReleased)
324+
XCTAssertTrue(StoredObject2.storedObjectReleased)
325+
XCTAssertNil(container)
326+
}
123327
}
124328
#endif

0 commit comments

Comments
 (0)