Skip to content

Commit 7db913f

Browse files
authored
Add Create effect that creates an asynchronous stream (#71)
1 parent 5fb07c2 commit 7db913f

File tree

3 files changed

+151
-0
lines changed

3 files changed

+151
-0
lines changed

Sources/OneWay/AnyEffect.swift

+21
Original file line numberDiff line numberDiff line change
@@ -246,4 +246,25 @@ extension AnyEffect {
246246
build()
247247
).eraseToAnyEffect()
248248
}
249+
250+
/// An effect that creates an asynchronous stream.
251+
///
252+
/// - Parameters:
253+
/// - bufferingPolicy: A `Continuation.BufferingPolicy` value to set the stream's buffering
254+
/// behavior. By default, the stream buffers an unlimited number of elements. You can also set
255+
/// the policy to buffer a specified number of oldest or newest elements.
256+
/// - build: A custom closure that yields values to the `AsyncStream`. This closure receives
257+
/// an `AsyncStream.Continuation` instance that it uses to provide elements to the stream and
258+
/// terminate the stream when finished.
259+
/// - Returns: A new effect.
260+
@inlinable
261+
public static func create(
262+
bufferingPolicy: AsyncStream<Element>.Continuation.BufferingPolicy = .unbounded,
263+
build: @escaping (AsyncStream<Element>.Continuation) -> Void
264+
) -> AnyEffect<Element> {
265+
Effects.Create(
266+
bufferingPolicy: bufferingPolicy,
267+
build: build
268+
).eraseToAnyEffect()
269+
}
249270
}

Sources/OneWay/Effect.swift

+25
Original file line numberDiff line numberDiff line change
@@ -203,4 +203,29 @@ public enum Effects {
203203
}
204204
}
205205
}
206+
207+
/// An effect that creates an asynchronous stream.
208+
public struct Create<Element>: Effect where Element: Sendable {
209+
private let stream: AsyncStream<Element>
210+
211+
/// Initializes a `Create` effect.
212+
///
213+
/// - Parameters:
214+
/// - bufferingPolicy: A `Continuation.BufferingPolicy` value to set the stream's
215+
/// buffering behavior. By default, the stream buffers an unlimited number of elements.
216+
/// You can also set the policy to buffer a specified number of oldest or newest elements.
217+
/// - build: A custom closure that yields values to the `AsyncStream`. This closure
218+
/// receives an `AsyncStream.Continuation` instance that it uses to provide elements to
219+
/// the stream and terminate the stream when finished.
220+
public init(
221+
bufferingPolicy: AsyncStream<Element>.Continuation.BufferingPolicy = .unbounded,
222+
build: @escaping (AsyncStream<Element>.Continuation) -> Void
223+
) {
224+
self.stream = AsyncStream<Element>(bufferingPolicy: bufferingPolicy, build)
225+
}
226+
227+
public var values: AsyncStream<Element> {
228+
stream
229+
}
230+
}
206231
}

Tests/OneWayTests/EffectTests.swift

+105
Original file line numberDiff line numberDiff line change
@@ -301,4 +301,109 @@ final class EffectTests: XCTestCase {
301301
]
302302
)
303303
}
304+
305+
func test_createSynchronously() async {
306+
let values = Effects.Create { continuation in
307+
continuation.yield(Action.first)
308+
continuation.yield(Action.second)
309+
continuation.yield(Action.third)
310+
continuation.yield(Action.fourth)
311+
continuation.yield(Action.fifth)
312+
continuation.finish()
313+
}.values
314+
315+
var result: [Action] = []
316+
for await value in values {
317+
result.append(value)
318+
}
319+
320+
XCTAssertEqual(
321+
result,
322+
[
323+
.first,
324+
.second,
325+
.third,
326+
.fourth,
327+
.fifth,
328+
]
329+
)
330+
}
331+
332+
func test_createAsynchronously() async {
333+
let clock = TestClock()
334+
335+
let values = Effects.Create { continuation in
336+
Task { @MainActor in
337+
try! await clock.sleep(for: .seconds(100))
338+
continuation.yield(Action.first)
339+
continuation.yield(Action.second)
340+
}
341+
Task { @MainActor in
342+
try! await clock.sleep(for: .seconds(200))
343+
continuation.yield(Action.third)
344+
continuation.yield(Action.fourth)
345+
continuation.yield(Action.fifth)
346+
}
347+
Task { @MainActor in
348+
try! await clock.sleep(for: .seconds(300))
349+
continuation.finish()
350+
}
351+
}.values
352+
353+
var result: [Action] = []
354+
await clock.advance(by: .seconds(300))
355+
for await value in values {
356+
result.append(value)
357+
}
358+
359+
XCTAssertEqual(
360+
result,
361+
[
362+
.first,
363+
.second,
364+
.third,
365+
.fourth,
366+
.fifth,
367+
]
368+
)
369+
}
370+
371+
func test_createAsynchronouslyWithCompletionHandler() async {
372+
let values = Effects.Create { continuation in
373+
perform { action in
374+
continuation.yield(action)
375+
if action == .fifth {
376+
continuation.finish()
377+
}
378+
}
379+
}.values
380+
381+
var result: [Action] = []
382+
for await value in values {
383+
result.append(value)
384+
}
385+
386+
XCTAssertEqual(
387+
result,
388+
[
389+
.first,
390+
.second,
391+
.third,
392+
.fourth,
393+
.fifth,
394+
]
395+
)
396+
397+
func perform(completionHandler: @Sendable @escaping (Action) -> Void) {
398+
DispatchQueue.main.asyncAfter(deadline: .now() + 0.1) {
399+
completionHandler(.first)
400+
completionHandler(.second)
401+
}
402+
DispatchQueue.main.asyncAfter(deadline: .now() + 0.2) {
403+
completionHandler(.third)
404+
completionHandler(.fourth)
405+
completionHandler(.fifth)
406+
}
407+
}
408+
}
304409
}

0 commit comments

Comments
 (0)