Skip to content
This repository was archived by the owner on Feb 2, 2025. It is now read-only.

Commit c680c3d

Browse files
author
Alex Belozierov
committed
- added CoFuture init with promise callback
- added subscribeCoChannel() for Publisher - added publisher() to CoChannel
1 parent c3da301 commit c680c3d

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

61 files changed

+666
-72
lines changed

Sources/SwiftCoroutine/CoFuture/Core/CoFuture.swift

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -113,6 +113,18 @@ extension CoFuture: _CoFutureCancellable {
113113
self.init(_result: nil)
114114
self.parent = parent
115115
}
116+
117+
/// Initializes a future that invokes a promise closure.
118+
/// ```
119+
/// func someAsyncFunc(callback: @escaping (Result<Int, Error>) -> Void) { ... }
120+
///
121+
/// let future = CoFuture(promise: someAsyncFunc)
122+
/// ```
123+
/// - Parameter promise: A closure to fulfill this future.
124+
@inlinable public convenience init<E: Error>(promise: (@escaping (Result<Value, E>) -> Void) -> Void) {
125+
self.init(_result: nil)
126+
promise(setResult2)
127+
}
116128

117129
/// Starts a new coroutine and initializes future with its result.
118130
///
@@ -154,6 +166,13 @@ extension CoFuture: _CoFutureCancellable {
154166
nodes.close()?.finish(with: result)
155167
}
156168

169+
@inlinable internal func setResult2<E: Error>(_ result: Result<Value, E>) {
170+
switch result {
171+
case .success(let value): setResult(.success(value))
172+
case .failure(let error): setResult(.failure(error))
173+
}
174+
}
175+
157176
// MARK: - Callback
158177

159178
@usableFromInline internal func addCallback(_ callback: @escaping (_Result) -> Void) {

Sources/SwiftCoroutine/CoFuture/Core/CoPromise.swift

Lines changed: 1 addition & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -20,10 +20,7 @@ extension CoPromise {
2020
}
2121

2222
@inlinable public func complete<E: Error>(with result: Result<Value, E>) {
23-
switch result {
24-
case .success(let value): setResult(.success(value))
25-
case .failure(let error): setResult(.failure(error))
26-
}
23+
setResult2(result)
2724
}
2825

2926
@inlinable public func success(_ value: Value) {
Lines changed: 49 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,49 @@
1+
//
2+
// CoChannel+Combine.swift
3+
// SwiftCoroutine
4+
//
5+
// Created by Alex Belozierov on 11.06.2020.
6+
// Copyright © 2020 Alex Belozierov. All rights reserved.
7+
//
8+
9+
#if canImport(Combine)
10+
import Combine
11+
12+
@available(OSX 10.15, iOS 13.0, *)
13+
extension CoChannel {
14+
15+
// MARK: - publisher
16+
17+
/// Returns a publisher that emits elements of this `CoChannel`.
18+
@inlinable public func publisher() -> AnyPublisher<Element, CoChannelError> {
19+
channel.publisher()
20+
}
21+
22+
}
23+
24+
@available(OSX 10.15, iOS 13.0, *)
25+
extension CoChannel.Receiver {
26+
27+
// MARK: - publisher
28+
29+
/// Returns a publisher that emits elements of this `Receiver`.
30+
public func publisher() -> AnyPublisher<Element, CoChannelError> {
31+
CoChannelPublisher(receiver: self).eraseToAnyPublisher()
32+
}
33+
34+
}
35+
36+
@available(OSX 10.15, iOS 13.0, *)
37+
extension Publisher {
38+
39+
/// Attaches `CoChannel.Receiver` as a subscriber and returns it.
40+
public func subscribeCoChannel(buffer: CoChannel<Output>.BufferType = .unlimited) -> CoChannel<Output>.Receiver {
41+
let channel = CoChannel<Output>(bufferType: buffer)
42+
let cancellable = sink(receiveCompletion: { _ in channel.close() },
43+
receiveValue: { channel.offer($0) })
44+
channel.whenCanceled(cancellable.cancel)
45+
return channel.receiver
46+
}
47+
48+
}
49+
#endif
Lines changed: 33 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,33 @@
1+
//
2+
// CoChannelPublisher.swift
3+
// SwiftCoroutine
4+
//
5+
// Created by Alex Belozierov on 11.06.2020.
6+
// Copyright © 2020 Alex Belozierov. All rights reserved.
7+
//
8+
9+
#if canImport(Combine)
10+
import Combine
11+
12+
@available(OSX 10.15, iOS 13.0, *)
13+
internal final class CoChannelPublisher<Output> {
14+
15+
internal typealias Failure = CoChannelError
16+
internal let receiver: CoChannel<Output>.Receiver
17+
18+
@inlinable internal init(receiver: CoChannel<Output>.Receiver) {
19+
self.receiver = receiver
20+
}
21+
22+
}
23+
24+
@available(OSX 10.15, iOS 13.0, *)
25+
extension CoChannelPublisher: Publisher {
26+
27+
@inlinable internal func receive<S: Subscriber>(subscriber: S) where Failure == S.Failure, Output == S.Input {
28+
let subscription = CoChannelSubscription(subscriber: subscriber, receiver: receiver)
29+
subscriber.receive(subscription: subscription)
30+
}
31+
32+
}
33+
#endif
Lines changed: 45 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,45 @@
1+
//
2+
// CoChannelSubscription.swift
3+
// SwiftCoroutine
4+
//
5+
// Created by Alex Belozierov on 11.06.2020.
6+
// Copyright © 2020 Alex Belozierov. All rights reserved.
7+
//
8+
9+
#if canImport(Combine)
10+
import Combine
11+
12+
@available(OSX 10.15, iOS 13.0, *)
13+
internal final class CoChannelSubscription<S: Subscriber, T>: Subscription where S.Input == T, S.Failure == CoChannelError {
14+
15+
private let receiver: CoChannel<T>.Receiver
16+
private var subscriber: S?
17+
18+
@inlinable internal init(subscriber: S, receiver: CoChannel<T>.Receiver) {
19+
self.receiver = receiver
20+
self.subscriber = subscriber
21+
@inline(__always) func subscribe() {
22+
receiver.whenReceive { result in
23+
guard let subscriber = self.subscriber else { return }
24+
switch result {
25+
case .success(let result):
26+
_ = subscriber.receive(result)
27+
subscribe()
28+
case .failure(let error) where error == .canceled:
29+
subscriber.receive(completion: .failure(error))
30+
case .failure:
31+
subscriber.receive(completion: .finished)
32+
}
33+
}
34+
}
35+
subscribe()
36+
}
37+
38+
@inlinable internal func cancel() {
39+
subscriber = nil
40+
}
41+
42+
@inlinable internal func request(_ demand: Subscribers.Demand) {}
43+
44+
}
45+
#endif

Sources/SwiftCoroutine/CoFuture/Operators/Combine/CoFuturePublisher.swift renamed to Sources/SwiftCoroutine/CoFuture/Operators/Combine/CoFuture/CoFuturePublisher.swift

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -10,13 +10,13 @@
1010
import Combine
1111

1212
@available(OSX 10.15, iOS 13.0, *)
13-
final class CoFuturePublisher<Output, Future: CoFuture<Output>> {
13+
internal final class CoFuturePublisher<Output> {
1414

15-
typealias Failure = Error
15+
internal typealias Failure = Error
1616

17-
let future: Future
17+
internal let future: CoFuture<Output>
1818

19-
@inlinable init(future: Future) {
19+
@inlinable internal init(future: CoFuture<Output>) {
2020
self.future = future
2121
}
2222

@@ -25,8 +25,8 @@ final class CoFuturePublisher<Output, Future: CoFuture<Output>> {
2525
@available(OSX 10.15, iOS 13.0, *)
2626
extension CoFuturePublisher: Publisher {
2727

28-
@inlinable func receive<S: Subscriber>(subscriber: S) where Failure == S.Failure, Output == S.Input {
29-
let subscription = CoSubscription(subscriber: subscriber, future: future)
28+
@inlinable internal func receive<S: Subscriber>(subscriber: S) where Failure == S.Failure, Output == S.Input {
29+
let subscription = CoFutureSubscription(subscriber: subscriber, future: future)
3030
subscriber.receive(subscription: subscription)
3131
}
3232

Sources/SwiftCoroutine/CoFuture/Operators/Combine/CoSubscription.swift renamed to Sources/SwiftCoroutine/CoFuture/Operators/Combine/CoFuture/CoFutureSubscription.swift

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
//
2-
// CoSubscription.swift
2+
// CoFutureSubscription.swift
33
// SwiftCoroutine
44
//
55
// Created by Alex Belozierov on 15.03.2020.
@@ -10,7 +10,7 @@
1010
import Combine
1111

1212
@available(OSX 10.15, iOS 13.0, *)
13-
internal final class CoSubscription<S: Subscriber, T>: Subscription where S.Input == T, S.Failure == Error {
13+
internal final class CoFutureSubscription<S: Subscriber, T>: Subscription where S.Input == T, S.Failure == Error {
1414

1515
private let future: CoFuture<T>
1616
private var subscriber: S?

0 commit comments

Comments
 (0)