Skip to content

Commit 1c05161

Browse files
authored
Updated ReplaySubject to replay its buffer to a new subscription while locked to ensure new values are sent after the replayed values. (#138)
1 parent 0addd96 commit 1c05161

File tree

2 files changed

+34
-7
lines changed

2 files changed

+34
-7
lines changed

Sources/Subjects/ReplaySubject.swift

+1-7
Original file line numberDiff line numberDiff line change
@@ -89,21 +89,15 @@ public final class ReplaySubject<Output, Failure: Error>: Subject {
8989
self?.completeSubscriber(withIdentifier: subscriberIdentifier)
9090
}
9191

92-
let buffer: [Output]
93-
let completion: Subscribers.Completion<Failure>?
94-
9592
do {
9693
lock.lock()
9794
defer { lock.unlock() }
9895

96+
subscription.replay(buffer, completion: completion)
9997
subscriptions.append(subscription)
100-
101-
buffer = self.buffer
102-
completion = self.completion
10398
}
10499

105100
subscriber.receive(subscription: subscription)
106-
subscription.replay(buffer, completion: completion)
107101
}
108102

109103
private func completeSubscriber(withIdentifier subscriberIdentifier: CombineIdentifier) {

Tests/ReplaySubjectTests.swift

+33
Original file line numberDiff line numberDiff line change
@@ -391,5 +391,38 @@ final class ReplaySubjectTests: XCTestCase {
391391

392392
XCTAssertTrue(subject.subscriptions.isEmpty)
393393
}
394+
395+
func testReplayOrderThreadSafety() async {
396+
continueAfterFailure = false
397+
// Loop to ensure any race condition is caught.
398+
for _ in 0..<5000 {
399+
let replaySubject = ReplaySubject<Int, Never>(bufferSize: 3)
400+
replaySubject.send(1)
401+
replaySubject.send(2)
402+
403+
// Use tasks to create a new subscription on one thread
404+
// while sending the third value on another thread.
405+
// The new subscription should always receive [1, 2, 3]
406+
await withTaskGroup(of: Void.self) { taskGroup in
407+
taskGroup.addTask {
408+
let output: [Int] = await withCheckedContinuation { continuation in
409+
var cancellable: AnyCancellable?
410+
cancellable = replaySubject.collect(3).first().sink(receiveValue: { value in
411+
continuation.resume(returning: value)
412+
withExtendedLifetime(cancellable) { cancellable = nil }
413+
})
414+
}
415+
416+
await MainActor.run {
417+
XCTAssertEqual(output, [1, 2, 3])
418+
}
419+
}
420+
421+
taskGroup.addTask {
422+
replaySubject.send(3)
423+
}
424+
}
425+
}
426+
}
394427
}
395428
#endif

0 commit comments

Comments
 (0)