Skip to content

Commit 224225f

Browse files
committed
Fix throttle and align with Combine.
1 parent 470657b commit 224225f

File tree

3 files changed

+73
-20
lines changed

3 files changed

+73
-20
lines changed

Sources/ExecutionContext.swift

Lines changed: 3 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -106,12 +106,8 @@ extension DispatchQueue {
106106
/// Schedule given block for execution after given interval passes.
107107
/// Scheduled execution can be cancelled by disposing the returned disposable.
108108
public func disposableAfter(when interval: Double, block: @escaping () -> Void) -> Disposable {
109-
let disposable = SimpleDisposable()
110-
asyncAfter(deadline: .now() + interval) {
111-
if !disposable.isDisposed {
112-
block()
113-
}
114-
}
115-
return disposable
109+
let workItem = DispatchWorkItem(block: block)
110+
asyncAfter(deadline: .now() + interval, execute: workItem)
111+
return BlockDisposable(workItem.cancel)
116112
}
117113
}

Sources/SignalProtocol+Filtering.swift

Lines changed: 34 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -374,24 +374,45 @@ extension SignalProtocol {
374374
}
375375
}
376376

377-
/// Throttle the signal to emit at most one element per given `seconds` interval.
377+
/// Throttle the signal to emit at most one element per given `seconds` interval. Signal will emit latest element from each interval.
378378
///
379379
/// Check out interactive example at [https://rxmarbles.com/#throttle](https://rxmarbles.com/#throttle)
380-
public func throttle(for seconds: Double) -> Signal<Element, Error> {
380+
public func throttle(for seconds: Double, queue: DispatchQueue = DispatchQueue(label: "com.reactive_kit.signal.throttle")) -> Signal<Element, Error> {
381381
return Signal { observer in
382-
let lock = NSRecursiveLock(name: "com.reactive_kit.signal.throttle")
383-
var _lastEventTime: DispatchTime?
382+
var isInitialElement = true
383+
var throttledDisposable: Disposable? = nil
384+
var lastElement: Element? = nil
385+
var isFinished: Bool = false
384386
return self.observe { event in
385-
switch event {
386-
case .next(let element):
387-
lock.lock(); defer { lock.unlock() }
388-
let now = DispatchTime.now()
389-
if _lastEventTime == nil || now.rawValue > (_lastEventTime! + seconds).rawValue {
390-
_lastEventTime = now
391-
observer.receive(element)
387+
queue.async {
388+
switch event {
389+
case .next(let element):
390+
if isInitialElement {
391+
isInitialElement = false
392+
observer.receive(element)
393+
} else {
394+
lastElement = element
395+
}
396+
guard throttledDisposable == nil else { return }
397+
throttledDisposable = queue.disposableAfter(when: seconds) {
398+
if let element = lastElement {
399+
observer.receive(element)
400+
lastElement = nil
401+
}
402+
if isFinished {
403+
observer.receive(completion: .finished)
404+
}
405+
throttledDisposable = nil
406+
}
407+
case .failed(let error):
408+
observer.receive(completion: .failure(error))
409+
case .completed:
410+
guard throttledDisposable == nil else {
411+
isFinished = true
412+
return
413+
}
414+
observer.receive(completion: .finished)
392415
}
393-
default:
394-
observer.on(event)
395416
}
396417
}
397418
}

Tests/ReactiveKitTests/SignalTests.swift

Lines changed: 36 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -309,6 +309,42 @@ class SignalTests: XCTestCase {
309309
XCTAssertTrue(subscriber.isFinished)
310310
}
311311

312+
func testDebounce() {
313+
// event 0 @ 0.0s - debounced
314+
// event 1 @ 0.4s - debounced
315+
// event 2 @ 0.8s - debounced
316+
// event 3 @ 1.2s - debounced
317+
// event 4 @ 1.6s - debounced
318+
// timesup @ 2.6s - return 4
319+
// event 5 @ 3.6s - debounced
320+
// timesup @ 4.6s - return 5
321+
let values = Signal<Int, Never>(sequence: 0..<5, interval: 0.4)
322+
.append(Signal<Int, Never>(just: 5, after: 2))
323+
.debounce(for: 1)
324+
.waitAndCollectElements()
325+
XCTAssertEqual(values, [4, 5])
326+
}
327+
328+
func testThrottle() {
329+
// event 0 @ 0.0s - return 0
330+
// event 1 @ 0.4s - throttled
331+
// event 2 @ 0.8s - throttled
332+
// event 3 @ 1.2s - throttled
333+
// throttle timesup @ 1.5s - return 3
334+
// event 4 @ 1.6s - throttled
335+
// event 5 @ 2.0s - throttled
336+
// event 6 @ 2.4s - throttled
337+
// event 7 @ 2.8s - throttled
338+
// throttle timesup @ 3.0s - return 7
339+
// event 8 @ 3.2s - throttled
340+
// event 9 @ 3.6s - throttled
341+
// completed @ 3.6s - return 9
342+
let values = Signal<Int, TestError>(sequence: 0..<10, interval: 0.4)
343+
.throttle(for: 1.5)
344+
.waitAndCollectElements()
345+
XCTAssertEqual(values, [0, 3, 7, 9])
346+
}
347+
312348
func testIgnoreNils() {
313349
let subscriber = Subscribers.Accumulator<Int, TestError>()
314350
let publisher = Signal<Int?, TestError>(sequence: Array<Int?>([1, nil, 3])).ignoreNils()

0 commit comments

Comments
 (0)