@@ -12,111 +12,227 @@ import Combine
12
12
// MARK: - Operator methods
13
13
@available ( OSX 10 . 15 , iOS 13 . 0 , tvOS 13 . 0 , watchOS 6 . 0 , * )
14
14
public extension Publisher {
15
- /// Merges two publishers into a single publisher by combining each value
16
- /// from self with the latest value from the second publisher, if any.
17
- ///
18
- /// - parameter other: A second publisher source.
19
- /// - parameter resultSelector: Function to invoke for each value from the self combined
20
- /// with the latest value from the second source, if any.
21
- ///
22
- /// - returns: A publisher containing the result of combining each value of the self
23
- /// with the latest value from the second publisher, if any, using the
24
- /// specified result selector function.
25
- func withLatestFrom< Other: Publisher , Result> ( _ other: Other ,
26
- resultSelector: @escaping ( Output , Other . Output ) -> Result )
27
- -> AnyPublisher < Result , Failure >
28
- where Other. Failure == Failure {
29
- let upstream = share ( )
30
-
31
- return other
32
- . map { second in upstream. map { resultSelector ( $0, second) } }
33
- . switchToLatest ( )
34
- . zip ( upstream) // `zip`ping and discarding `\.1` allows for
35
- // upstream completions to be projected down immediately.
36
- . map ( \. 0 )
37
- . eraseToAnyPublisher ( )
38
- }
15
+ /// Merges two publishers into a single publisher by combining each value
16
+ /// from self with the latest value from the second publisher, if any.
17
+ ///
18
+ /// - parameter other: A second publisher source.
19
+ /// - parameter resultSelector: Function to invoke for each value from the self combined
20
+ /// with the latest value from the second source, if any.
21
+ ///
22
+ /// - returns: A publisher containing the result of combining each value of the self
23
+ /// with the latest value from the second publisher, if any, using the
24
+ /// specified result selector function.
25
+ func withLatestFrom< Other: Publisher , Result> ( _ other: Other ,
26
+ resultSelector: @escaping ( Output , Other . Output ) -> Result )
27
+ -> Publishers . WithLatestFrom < Self , Other , Result > {
28
+ return . init( upstream: self , second: other, resultSelector: resultSelector)
29
+ }
39
30
40
- /// Merges three publishers into a single publisher by combining each value
41
- /// from self with the latest value from the second and third publisher, if any.
42
- ///
43
- /// - parameter other: A second publisher source.
44
- /// - parameter other1: A third publisher source.
45
- /// - parameter resultSelector: Function to invoke for each value from the self combined
46
- /// with the latest value from the second and third source, if any.
47
- ///
48
- /// - returns: A publisher containing the result of combining each value of the self
49
- /// with the latest value from the second and third publisher, if any, using the
50
- /// specified result selector function.
51
- func withLatestFrom< Other: Publisher , Other1: Publisher , Result> ( _ other: Other ,
52
- _ other1: Other1 ,
53
- resultSelector: @escaping ( Output , ( Other . Output , Other1 . Output ) ) -> Result )
54
- -> AnyPublisher < Result , Failure >
31
+ /// Merges three publishers into a single publisher by combining each value
32
+ /// from self with the latest value from the second and third publisher, if any.
33
+ ///
34
+ /// - parameter other: A second publisher source.
35
+ /// - parameter other1: A third publisher source.
36
+ /// - parameter resultSelector: Function to invoke for each value from the self combined
37
+ /// with the latest value from the second and third source, if any.
38
+ ///
39
+ /// - returns: A publisher containing the result of combining each value of the self
40
+ /// with the latest value from the second and third publisher, if any, using the
41
+ /// specified result selector function.
42
+ func withLatestFrom< Other: Publisher , Other1: Publisher , Result> ( _ other: Other ,
43
+ _ other1: Other1 ,
44
+ resultSelector: @escaping ( Output , ( Other . Output , Other1 . Output ) ) -> Result )
45
+ -> Publishers . WithLatestFrom < Self , AnyPublisher < ( Other . Output , Other1 . Output ) , Self . Failure > , Result >
55
46
where Other. Failure == Failure , Other1. Failure == Failure {
56
- withLatestFrom ( other. combineLatest ( other1) , resultSelector: resultSelector)
57
- }
47
+ let combined = other. combineLatest ( other1)
48
+ . eraseToAnyPublisher ( )
49
+ return . init( upstream: self , second: combined, resultSelector: resultSelector)
50
+ }
58
51
59
- /// Merges four publishers into a single publisher by combining each value
60
- /// from self with the latest value from the second, third and fourth publisher, if any.
61
- ///
62
- /// - parameter other: A second publisher source.
63
- /// - parameter other1: A third publisher source.
64
- /// - parameter other2: A fourth publisher source.
65
- /// - parameter resultSelector: Function to invoke for each value from the self combined
66
- /// with the latest value from the second, third and fourth source, if any.
67
- ///
68
- /// - returns: A publisher containing the result of combining each value of the self
69
- /// with the latest value from the second, third and fourth publisher, if any, using the
70
- /// specified result selector function.
71
- func withLatestFrom< Other: Publisher , Other1: Publisher , Other2: Publisher , Result> ( _ other: Other ,
72
- _ other1: Other1 ,
73
- _ other2: Other2 ,
74
- resultSelector: @escaping ( Output , ( Other . Output , Other1 . Output , Other2 . Output ) ) -> Result )
75
- -> AnyPublisher < Result , Failure >
52
+ /// Merges four publishers into a single publisher by combining each value
53
+ /// from self with the latest value from the second, third and fourth publisher, if any.
54
+ ///
55
+ /// - parameter other: A second publisher source.
56
+ /// - parameter other1: A third publisher source.
57
+ /// - parameter other2: A fourth publisher source.
58
+ /// - parameter resultSelector: Function to invoke for each value from the self combined
59
+ /// with the latest value from the second, third and fourth source, if any.
60
+ ///
61
+ /// - returns: A publisher containing the result of combining each value of the self
62
+ /// with the latest value from the second, third and fourth publisher, if any, using the
63
+ /// specified result selector function.
64
+ func withLatestFrom< Other: Publisher , Other1: Publisher , Other2: Publisher , Result> ( _ other: Other ,
65
+ _ other1: Other1 ,
66
+ _ other2: Other2 ,
67
+ resultSelector: @escaping ( Output , ( Other . Output , Other1 . Output , Other2 . Output ) ) -> Result )
68
+ -> Publishers . WithLatestFrom < Self , AnyPublisher < ( Other . Output , Other1 . Output , Other2 . Output ) , Self . Failure > , Result >
76
69
where Other. Failure == Failure , Other1. Failure == Failure , Other2. Failure == Failure {
77
- withLatestFrom ( other. combineLatest ( other1, other2) , resultSelector: resultSelector)
70
+ let combined = other. combineLatest ( other1, other2)
71
+ . eraseToAnyPublisher ( )
72
+ return . init( upstream: self , second: combined, resultSelector: resultSelector)
73
+ }
74
+
75
+ /// Upon an emission from self, emit the latest value from the
76
+ /// second publisher, if any exists.
77
+ ///
78
+ /// - parameter other: A second publisher source.
79
+ ///
80
+ /// - returns: A publisher containing the latest value from the second publisher, if any.
81
+ func withLatestFrom< Other: Publisher > ( _ other: Other )
82
+ -> Publishers . WithLatestFrom < Self , Other , Other . Output > {
83
+ return . init( upstream: self , second: other) { $1 }
84
+ }
85
+
86
+ /// Upon an emission from self, emit the latest value from the
87
+ /// second and third publisher, if any exists.
88
+ ///
89
+ /// - parameter other: A second publisher source.
90
+ /// - parameter other1: A third publisher source.
91
+ ///
92
+ /// - returns: A publisher containing the latest value from the second and third publisher, if any.
93
+ func withLatestFrom< Other: Publisher , Other1: Publisher > ( _ other: Other ,
94
+ _ other1: Other1 )
95
+ -> Publishers . WithLatestFrom < Self , AnyPublisher < ( Other . Output , Other1 . Output ) , Self . Failure > , ( Other . Output , Other1 . Output ) >
96
+ where Other. Failure == Failure , Other1. Failure == Failure {
97
+ withLatestFrom ( other, other1) { $1 }
98
+ }
99
+
100
+ /// Upon an emission from self, emit the latest value from the
101
+ /// second, third and forth publisher, if any exists.
102
+ ///
103
+ /// - parameter other: A second publisher source.
104
+ /// - parameter other1: A third publisher source.
105
+ /// - parameter other2: A forth publisher source.
106
+ ///
107
+ /// - returns: A publisher containing the latest value from the second, third and forth publisher, if any.
108
+ func withLatestFrom< Other: Publisher , Other1: Publisher , Other2: Publisher > ( _ other: Other ,
109
+ _ other1: Other1 ,
110
+ _ other2: Other2 )
111
+ -> Publishers . WithLatestFrom < Self , AnyPublisher < ( Other . Output , Other1 . Output , Other2 . Output ) , Self . Failure > , ( Other . Output , Other1 . Output , Other2 . Output ) >
112
+ where Other. Failure == Failure , Other1. Failure == Failure , Other2. Failure == Failure {
113
+ withLatestFrom ( other, other1, other2) { $1 }
114
+ }
115
+ }
116
+
117
+ // MARK: - Publisher
118
+ @available ( OSX 10 . 15 , iOS 13 . 0 , tvOS 13 . 0 , watchOS 6 . 0 , * )
119
+ public extension Publishers {
120
+ struct WithLatestFrom < Upstream: Publisher ,
121
+ Other: Publisher ,
122
+ Output> : Publisher where Upstream. Failure == Other . Failure {
123
+ public typealias Failure = Upstream . Failure
124
+ public typealias ResultSelector = ( Upstream . Output , Other . Output ) -> Output
125
+
126
+ private let upstream : Upstream
127
+ private let second : Other
128
+ private let resultSelector : ResultSelector
129
+ private var latestValue : Other . Output ?
130
+
131
+ init ( upstream: Upstream ,
132
+ second: Other ,
133
+ resultSelector: @escaping ResultSelector ) {
134
+ self . upstream = upstream
135
+ self . second = second
136
+ self . resultSelector = resultSelector
78
137
}
79
138
80
- /// Upon an emission from self, emit the latest value from the
81
- /// second publisher, if any exists.
82
- ///
83
- /// - parameter other: A second publisher source.
84
- ///
85
- /// - returns: A publisher containing the latest value from the second publisher, if any.
86
- func withLatestFrom< Other: Publisher > ( _ other: Other )
87
- -> AnyPublisher < Other . Output , Failure >
88
- where Other. Failure == Failure {
89
- withLatestFrom ( other) { $1 }
139
+ public func receive< S: Subscriber > ( subscriber: S ) where Failure == S . Failure , Output == S . Input {
140
+ subscriber. receive ( subscription: Subscription ( upstream: upstream,
141
+ downstream: subscriber,
142
+ second: second,
143
+ resultSelector: resultSelector) )
90
144
}
145
+ }
146
+ }
91
147
92
- /// Upon an emission from self, emit the latest value from the
93
- /// second and third publisher, if any exists.
94
- ///
95
- /// - parameter other: A second publisher source.
96
- /// - parameter other1: A third publisher source.
97
- ///
98
- /// - returns: A publisher containing the latest value from the second and third publisher, if any.
99
- func withLatestFrom< Other: Publisher , Other1: Publisher > ( _ other: Other ,
100
- _ other1: Other1 )
101
- -> AnyPublisher < ( Other . Output , Other1 . Output ) , Failure >
102
- where Other. Failure == Failure , Other1. Failure == Failure {
103
- withLatestFrom ( other, other1) { $1 }
148
+ // MARK: - Subscription
149
+ @available ( OSX 10 . 15 , iOS 13 . 0 , tvOS 13 . 0 , watchOS 6 . 0 , * )
150
+ private extension Publishers . WithLatestFrom {
151
+ class Subscription < Downstream: Subscriber > : Combine . Subscription , CustomStringConvertible where Downstream. Input == Output , Downstream. Failure == Failure {
152
+ private let resultSelector : ResultSelector
153
+ private var sink : Sink < Upstream , Downstream > ?
154
+
155
+ private let upstream : Upstream
156
+ private let downstream : Downstream
157
+ private let second : Other
158
+
159
+ // Secondary (other) publisher
160
+ private var latestValue : Other . Output ?
161
+ private var otherSubscription : Cancellable ?
162
+ private var preInitialDemand = Subscribers . Demand. none
163
+
164
+ init ( upstream: Upstream ,
165
+ downstream: Downstream ,
166
+ second: Other ,
167
+ resultSelector: @escaping ResultSelector ) {
168
+ self . upstream = upstream
169
+ self . second = second
170
+ self . downstream = downstream
171
+ self . resultSelector = resultSelector
172
+
173
+ trackLatestFromSecond { [ weak self] in
174
+ guard let self = self else { return }
175
+ self . request ( self . preInitialDemand)
176
+ self . preInitialDemand = . none
177
+ }
104
178
}
105
179
106
- /// Upon an emission from self, emit the latest value from the
107
- /// second, third and forth publisher, if any exists.
108
- ///
109
- /// - parameter other: A second publisher source.
110
- /// - parameter other1: A third publisher source.
111
- /// - parameter other2: A forth publisher source.
112
- ///
113
- /// - returns: A publisher containing the latest value from the second, third and forth publisher, if any.
114
- func withLatestFrom< Other: Publisher , Other1: Publisher , Other2: Publisher > ( _ other: Other ,
115
- _ other1: Other1 ,
116
- _ other2: Other2 )
117
- -> AnyPublisher < ( Other . Output , Other1 . Output , Other2 . Output ) , Failure >
118
- where Other. Failure == Failure , Other1. Failure == Failure , Other2. Failure == Failure {
119
- withLatestFrom ( other, other1, other2) { $1 }
180
+ func request( _ demand: Subscribers . Demand ) {
181
+ guard latestValue != nil else {
182
+ preInitialDemand += demand
183
+ return
184
+ }
185
+
186
+ self . sink? . demand ( demand)
187
+ }
188
+
189
+ // Create an internal subscription to the `Other` publisher,
190
+ // constantly tracking its latest value
191
+ private func trackLatestFromSecond( onInitialValue: @escaping ( ) -> Void ) {
192
+ var gotInitialValue = false
193
+
194
+ let subscriber = AnySubscriber < Other . Output , Other . Failure > (
195
+ receiveSubscription: { [ weak self] subscription in
196
+ self ? . otherSubscription = subscription
197
+ subscription. request ( . unlimited)
198
+ } ,
199
+ receiveValue: { [ weak self] value in
200
+ guard let self = self else { return . none }
201
+ self . latestValue = value
202
+
203
+ if !gotInitialValue {
204
+ // When getting initial value, start pulling values
205
+ // from upstream in the main sink
206
+ self . sink = Sink ( upstream: self . upstream,
207
+ downstream: self . downstream,
208
+ transformOutput: { [ weak self] value in
209
+ guard let self = self ,
210
+ let other = self . latestValue else { return nil }
211
+
212
+ return self . resultSelector ( value, other)
213
+ } ,
214
+ transformFailure: { $0 } )
215
+
216
+ // Signal initial value to start fulfilling downstream demand
217
+ gotInitialValue = true
218
+ onInitialValue ( )
219
+ }
220
+
221
+ return . unlimited
222
+ } ,
223
+ receiveCompletion: nil )
224
+
225
+ self . second. subscribe ( subscriber)
226
+ }
227
+
228
+ var description : String {
229
+ return " WithLatestFrom.Subscription< \( Output . self) , \( Failure . self) > "
230
+ }
231
+
232
+ func cancel( ) {
233
+ sink = nil
234
+ otherSubscription? . cancel ( )
120
235
}
236
+ }
121
237
}
122
238
#endif
0 commit comments