Skip to content

Commit bfc1a02

Browse files
authored
Adds Collection.flatMapBatches(of:). (#73)
* Adds `Collection.batchedSubscribe(by:)`. h/t to @freak4pc and @natecook1000 for this implementation from a thread in iOS Folks. πŸ‘πŸ½ * `Collection.batchedSubscribe(by:)` β‡’ `.flatMapBatches(of)`.
1 parent 0605a06 commit bfc1a02

File tree

4 files changed

+237
-0
lines changed

4 files changed

+237
-0
lines changed

β€ŽCombineExt.xcodeproj/project.pbxproj

+8
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,8 @@
3131
BF3D3B67253B88E500D830ED /* IgnoreFailureTests.swift in Sources */ = {isa = PBXBuildFile; fileRef = BF3D3B66253B88E500D830ED /* IgnoreFailureTests.swift */; };
3232
BF43CC1525008B4F005AFA28 /* IgnoreOutputSetOutputType.swift in Sources */ = {isa = PBXBuildFile; fileRef = BF43CC1425008B4F005AFA28 /* IgnoreOutputSetOutputType.swift */; };
3333
BF43CC1725008C45005AFA28 /* IgnoreOutputSetOutputTypeTests.swift in Sources */ = {isa = PBXBuildFile; fileRef = BF43CC1625008C45005AFA28 /* IgnoreOutputSetOutputTypeTests.swift */; };
34+
BFADDC8125BCE4C200465E9B /* FlatMapBatches.swift in Sources */ = {isa = PBXBuildFile; fileRef = BFADDC8025BCE4C200465E9B /* FlatMapBatches.swift */; };
35+
BFADDC8B25BCE91E00465E9B /* FlatMapBatchesTests.swift in Sources */ = {isa = PBXBuildFile; fileRef = BFADDC8A25BCE91E00465E9B /* FlatMapBatchesTests.swift */; };
3436
C387777C24E6BBE900FAD2D8 /* Nwise.swift in Sources */ = {isa = PBXBuildFile; fileRef = C387777B24E6BBE900FAD2D8 /* Nwise.swift */; };
3537
C387777F24E6BF8F00FAD2D8 /* NwiseTests.swift in Sources */ = {isa = PBXBuildFile; fileRef = C387777D24E6BF6C00FAD2D8 /* NwiseTests.swift */; };
3638
D836234824EA9446002353AC /* MergeMany.swift in Sources */ = {isa = PBXBuildFile; fileRef = D836234724EA9446002353AC /* MergeMany.swift */; };
@@ -112,6 +114,8 @@
112114
BF3D3B66253B88E500D830ED /* IgnoreFailureTests.swift */ = {isa = PBXFileReference; lastKnownFileType = sourcecode.swift; path = IgnoreFailureTests.swift; sourceTree = "<group>"; };
113115
BF43CC1425008B4F005AFA28 /* IgnoreOutputSetOutputType.swift */ = {isa = PBXFileReference; lastKnownFileType = sourcecode.swift; path = IgnoreOutputSetOutputType.swift; sourceTree = "<group>"; };
114116
BF43CC1625008C45005AFA28 /* IgnoreOutputSetOutputTypeTests.swift */ = {isa = PBXFileReference; lastKnownFileType = sourcecode.swift; path = IgnoreOutputSetOutputTypeTests.swift; sourceTree = "<group>"; };
117+
BFADDC8025BCE4C200465E9B /* FlatMapBatches.swift */ = {isa = PBXFileReference; lastKnownFileType = sourcecode.swift; path = FlatMapBatches.swift; sourceTree = "<group>"; };
118+
BFADDC8A25BCE91E00465E9B /* FlatMapBatchesTests.swift */ = {isa = PBXFileReference; lastKnownFileType = sourcecode.swift; path = FlatMapBatchesTests.swift; sourceTree = "<group>"; };
115119
C387777B24E6BBE900FAD2D8 /* Nwise.swift */ = {isa = PBXFileReference; fileEncoding = 4; lastKnownFileType = sourcecode.swift; path = Nwise.swift; sourceTree = "<group>"; };
116120
C387777D24E6BF6C00FAD2D8 /* NwiseTests.swift */ = {isa = PBXFileReference; lastKnownFileType = sourcecode.swift; path = NwiseTests.swift; sourceTree = "<group>"; };
117121
"CombineExt::CombineExt::Product" /* CombineExt.framework */ = {isa = PBXFileReference; explicitFileType = wrapper.framework; path = CombineExt.framework; sourceTree = BUILT_PRODUCTS_DIR; };
@@ -256,6 +260,7 @@
256260
OBJ_32 /* WithLatestFrom.swift */,
257261
OBJ_33 /* ZipMany.swift */,
258262
1970A8A925246FBD00799AB6 /* FilterMany.swift */,
263+
BFADDC8025BCE4C200465E9B /* FlatMapBatches.swift */,
259264
);
260265
path = Operators;
261266
sourceTree = "<group>";
@@ -307,6 +312,7 @@
307312
OBJ_59 /* ToggleTests.swift */,
308313
OBJ_60 /* WithLatestFromTests.swift */,
309314
OBJ_61 /* ZipManyTests.swift */,
315+
BFADDC8A25BCE91E00465E9B /* FlatMapBatchesTests.swift */,
310316
);
311317
path = Tests;
312318
sourceTree = SOURCE_ROOT;
@@ -562,6 +568,7 @@
562568
OBJ_137 /* ReplaySubjectTests.swift in Sources */,
563569
OBJ_138 /* SetOutputTypeTests.swift in Sources */,
564570
OBJ_139 /* ShareReplayTests.swift in Sources */,
571+
BFADDC8B25BCE91E00465E9B /* FlatMapBatchesTests.swift in Sources */,
565572
OBJ_140 /* ToggleTests.swift in Sources */,
566573
OBJ_141 /* WithLatestFromTests.swift in Sources */,
567574
BF43CC1725008C45005AFA28 /* IgnoreOutputSetOutputTypeTests.swift in Sources */,
@@ -584,6 +591,7 @@
584591
OBJ_86 /* AssignToMany.swift in Sources */,
585592
BF3D3B5D253B83F300D830ED /* IgnoreFailure.swift in Sources */,
586593
BF330EFB24F20080001281FC /* Lock.swift in Sources */,
594+
BFADDC8125BCE4C200465E9B /* FlatMapBatches.swift in Sources */,
587595
OBJ_87 /* CombineLatestMany.swift in Sources */,
588596
OBJ_88 /* Create.swift in Sources */,
589597
OBJ_89 /* Dematerialize.swift in Sources */,

β€ŽREADME.md

+36
Original file line numberDiff line numberDiff line change
@@ -42,6 +42,7 @@ All operators, utilities and helpers respect Combine's publisher contract, inclu
4242
* [nwise(_:) and pairwise()](#nwise)
4343
* [ignoreOutput(setOutputType:)](#ignoreOutputsetOutputType)
4444
* [ignoreFailure](#ignoreFailure)
45+
* [flatMapBatches(of:)](#flatMapBatchesof)
4546

4647
### Publishers
4748
* [AnyPublisher.create](#AnypublisherCreate)
@@ -504,6 +505,8 @@ subscription = [1, 1, 2, 1, 3, 3, 4].publisher
504505
.sink(receiveValue: { print("removeAllDuplicates: \($0)") })
505506
```
506507

508+
#### Output:
509+
507510
```none
508511
removeAllDuplicates: 1
509512
removeAllDuplicates: 2
@@ -535,6 +538,8 @@ subscription2 = replayedPublisher
535538
.sink(receiveValue: { print("second subscriber: \($0)") })
536539
```
537540

541+
#### Output:
542+
538543
```none
539544
first subscriber: 1
540545
first subscriber: 2
@@ -565,6 +570,8 @@ DispatchQueue.main.asyncAfter(deadline: .now() + 1) {
565570
}
566571
```
567572

573+
#### Output:
574+
568575
```none
569576
1
570577
2
@@ -587,6 +594,8 @@ subject.send(false)
587594
subject.send(true)
588595
```
589596

597+
#### Output:
598+
590599
```none
591600
false
592601
true
@@ -611,6 +620,8 @@ subject.send(4)
611620
subject.send(5)
612621
```
613622

623+
#### Output:
624+
614625
```none
615626
[1, 2, 3]
616627
[2, 3, 4]
@@ -635,6 +646,8 @@ subject.send(4)
635646
subject.send(5)
636647
```
637648

649+
#### Output:
650+
638651
```none
639652
1 -> 2
640653
2 -> 3
@@ -676,13 +689,36 @@ subject.send(3)
676689
subject.send(completion: .failure(.someError))
677690
```
678691

692+
#### Output:
693+
679694
```none
680695
1
681696
2
682697
3
683698
.finished
684699
```
685700

701+
### flatMapBatches(of:)
702+
703+
`Collection.flatMapBatches(of:)` subscribes to the receiver’s contained publishers in batches and returns their outputs in batches, too (while maintaining order). Subsequent batches of publishers are only subscribed to when prior batches successfully complete β€”Β any one failure is forwarded downstream.
704+
705+
```swift
706+
let ints = (1...6).map(Just.init)
707+
708+
subscription = ints
709+
.flatMapBatches(of: 2)
710+
.sink(receiveCompletion: { print($0) }, receiveValue: { print($0) })
711+
```
712+
713+
#### Output:
714+
715+
```none
716+
[1, 2]
717+
[3, 4]
718+
[5, 6]
719+
.finished
720+
```
721+
686722
## Publishers
687723

688724
This section outlines some of the custom Combine publishers CombineExt provides
+44
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,44 @@
1+
//
2+
// FlatMapBatches.swift
3+
// CombineExt
4+
//
5+
// Created by Shai Mishali, Nate Cook, and Jasdev Singh on 21/01/2021.
6+
// Copyright Β© 2021 Combine Community. All rights reserved.
7+
//
8+
9+
#if canImport(Combine)
10+
import Combine
11+
12+
@available(OSX 10.15, iOS 13.0, tvOS 13.0, watchOS 6.0, *)
13+
public extension Collection where Element: Publisher {
14+
/// Subscribes to the receiver’s contained publishers `size` at a time
15+
/// and outputs their results in `size`-sized batches, while maintaining
16+
/// order within each batch β€” subsequent batches of publishers are only
17+
/// subscribed to when the batch before it successfully completes. Any
18+
/// one failure will be forwarded downstream.
19+
/// - Parameter size: The batch size.
20+
/// - Returns: A publisher that subscribes to `self`’s contained publishers
21+
/// `size` at a time, returning their results in-order in `size`-sized
22+
/// batches, and then repeats with subsequent batches only if the ones prior
23+
/// successfully completed. Any one failure is immediately forwarded downstream.
24+
func flatMapBatches(of size: Int) -> AnyPublisher<[Element.Output], Element.Failure> {
25+
precondition(size > 0, "Batch sizes must be positive.")
26+
27+
let indexBreaks = sequence(
28+
first: startIndex,
29+
next: {
30+
$0 == endIndex ?
31+
nil :
32+
index($0, offsetBy: size, limitedBy: endIndex)
33+
?? endIndex
34+
}
35+
)
36+
37+
return Swift.zip(indexBreaks, indexBreaks.dropFirst())
38+
.publisher
39+
.setFailureType(to: Element.Failure.self)
40+
.flatMap(maxPublishers: .max(1)) { self[$0..<$1].zip() }
41+
.eraseToAnyPublisher()
42+
}
43+
}
44+
#endif

β€ŽTests/FlatMapBatchesTests.swift

+149
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,149 @@
1+
//
2+
// FlatMapBatchesTests.swift
3+
// CombineExtTests
4+
//
5+
// Created by Jasdev Singh on 23/01/2021.
6+
// Copyright Β© 2021 Combine Community. All rights reserved.
7+
//
8+
9+
#if !os(watchOS)
10+
import XCTest
11+
import Combine
12+
import CombineExt
13+
14+
@available(OSX 10.15, iOS 13.0, tvOS 13.0, watchOS 6.0, *)
15+
final class FlatMapBatchesTests: XCTestCase {
16+
private var subscription: AnyCancellable!
17+
18+
private enum BatchedSubscribeError: Error, Equatable {
19+
case anError
20+
}
21+
22+
func testEvenBatches() {
23+
let ints = (1...6).map(Just.init)
24+
25+
var results = [[Int]]()
26+
var completed = false
27+
28+
subscription = ints
29+
.flatMapBatches(of: 2)
30+
.sink(receiveCompletion: { _ in completed = true },
31+
receiveValue: { results.append($0) })
32+
33+
XCTAssertEqual(results, [[1, 2], [3, 4], [5, 6]])
34+
XCTAssertTrue(completed)
35+
}
36+
37+
func testUnevenBatches() {
38+
let ints = (1...5).map(Just.init)
39+
40+
var results = [[Int]]()
41+
var completed = false
42+
43+
subscription = ints
44+
.flatMapBatches(of: 2)
45+
.sink(receiveCompletion: { _ in completed = true },
46+
receiveValue: { results.append($0) })
47+
48+
XCTAssertEqual(results, [[1, 2], [3, 4], [5]])
49+
XCTAssertTrue(completed)
50+
}
51+
52+
func testForwardsError() {
53+
let publishers = [Fail(error: BatchedSubscribeError.anError).eraseToAnyPublisher()] +
54+
(1...3).map {
55+
Just($0)
56+
.setFailureType(to: BatchedSubscribeError.self)
57+
.eraseToAnyPublisher()
58+
}
59+
60+
var results = [[Int]]()
61+
var completion: Subscribers.Completion<BatchedSubscribeError>?
62+
63+
subscription = publishers
64+
.flatMapBatches(of: 2)
65+
.sink(receiveCompletion: { completion = $0 },
66+
receiveValue: { results.append($0) })
67+
68+
XCTAssertTrue(results.isEmpty)
69+
XCTAssertEqual(completion, .failure(.anError))
70+
}
71+
72+
func testHangsIfEarlierBatchDoesntComplete() {
73+
let uncompleted = (1...2).map { number in
74+
AnyPublisher<Int, Never>.create { subscriber in
75+
subscriber.send(number)
76+
return AnyCancellable { }
77+
}
78+
}
79+
80+
let publishers = uncompleted +
81+
(3...4).map(Just.init).map(AnyPublisher.init)
82+
83+
var results = [[Int]]()
84+
var completed = false
85+
86+
subscription = publishers
87+
.flatMapBatches(of: 2)
88+
.sink(receiveCompletion: { _ in completed = true },
89+
receiveValue: { results.append($0) })
90+
91+
XCTAssertEqual(results, [[1, 2]])
92+
XCTAssertFalse(completed)
93+
}
94+
95+
func testEmptyCollection() {
96+
let publishers = EmptyCollection<AnyPublisher<Int, Never>>()
97+
98+
var results = [[Int]]()
99+
var completed = false
100+
101+
subscription = publishers
102+
.flatMapBatches(of: 2)
103+
.sink(receiveCompletion: { _ in completed = true },
104+
receiveValue: { results.append($0) })
105+
106+
XCTAssertTrue(results.isEmpty)
107+
XCTAssertTrue(completed)
108+
}
109+
110+
func testBatchLimitLargerThanCount() {
111+
let ints = [Just(1)]
112+
113+
var results = [[Int]]()
114+
var completed = false
115+
116+
subscription = ints
117+
.flatMapBatches(of: 2)
118+
.sink(receiveCompletion: { _ in completed = true },
119+
receiveValue: { results.append($0) })
120+
121+
XCTAssertEqual(results, [[1]])
122+
XCTAssertTrue(completed)
123+
}
124+
125+
func testMultipleOutputsPerPublisher() {
126+
let publishers = (1...2).map { number in
127+
AnyPublisher<Int, Never>.create { subscriber in
128+
subscriber.send(number)
129+
subscriber.send(number)
130+
subscriber.send(completion: .finished)
131+
132+
return AnyCancellable { }
133+
}
134+
} +
135+
(3...4).map(Just.init).map(AnyPublisher.init)
136+
137+
var results = [[Int]]()
138+
var completed = false
139+
140+
subscription = publishers
141+
.flatMapBatches(of: 2)
142+
.sink(receiveCompletion: { _ in completed = true },
143+
receiveValue: { results.append($0) })
144+
145+
XCTAssertEqual(results, [[1, 2], [1, 2], [3, 4]])
146+
XCTAssertTrue(completed)
147+
}
148+
}
149+
#endif

0 commit comments

Comments
Β (0)