Skip to content

Commit 94eb532

Browse files
Add support for multiple pMapSkip's (#52)
Co-authored-by: Sindre Sorhus <[email protected]>
1 parent e7ca665 commit 94eb532

File tree

3 files changed

+104
-9
lines changed

3 files changed

+104
-9
lines changed

index.js

+23-9
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,7 @@ export default async function pMap(
2323

2424
const result = [];
2525
const errors = [];
26-
const skippedIndexes = [];
26+
const skippedIndexesMap = new Map();
2727
let isRejected = false;
2828
let isResolved = false;
2929
let isIterableDone = false;
@@ -59,15 +59,28 @@ export default async function pMap(
5959
if (resolvingCount === 0 && !isResolved) {
6060
if (!stopOnError && errors.length > 0) {
6161
reject(new AggregateError(errors));
62-
} else {
63-
isResolved = true;
62+
return;
63+
}
6464

65-
for (const skippedIndex of skippedIndexes) {
66-
result.splice(skippedIndex, 1);
67-
}
65+
isResolved = true;
6866

67+
if (!skippedIndexesMap.size) {
6968
resolve(result);
69+
return;
7070
}
71+
72+
const pureResult = [];
73+
74+
// Support multiple `pMapSkip`'s.
75+
for (const [index, value] of result.entries()) {
76+
if (skippedIndexesMap.get(index) === pMapSkip) {
77+
continue;
78+
}
79+
80+
pureResult.push(value);
81+
}
82+
83+
resolve(pureResult);
7184
}
7285

7386
return;
@@ -86,12 +99,13 @@ export default async function pMap(
8699

87100
const value = await mapper(element, index);
88101

102+
// Use Map to stage the index of the element.
89103
if (value === pMapSkip) {
90-
skippedIndexes.push(index);
91-
} else {
92-
result[index] = value;
104+
skippedIndexesMap.set(index, value);
93105
}
94106

107+
result[index] = value;
108+
95109
resolvingCount--;
96110
await next();
97111
} catch (error) {
+37
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,37 @@
1+
import test from 'ava';
2+
import inRange from 'in-range';
3+
import timeSpan from 'time-span';
4+
import pMap, {pMapSkip} from './index.js';
5+
6+
function generateSkipPerformanceData(length) {
7+
const data = [];
8+
for (let index = 0; index < length; index++) {
9+
data.push(pMapSkip);
10+
}
11+
12+
return data;
13+
}
14+
15+
test('multiple pMapSkips - algorithmic complexity', async t => {
16+
const testData = [generateSkipPerformanceData(1000), generateSkipPerformanceData(10000), generateSkipPerformanceData(100000)];
17+
const testDurationsMS = [];
18+
19+
for (const data of testData) {
20+
const end = timeSpan();
21+
// eslint-disable-next-line no-await-in-loop
22+
await pMap(data, async value => value);
23+
testDurationsMS.push(end());
24+
}
25+
26+
for (let index = 0; index < testDurationsMS.length - 1; index++) {
27+
// Time for 10x more items should take between 9x and 11x more time.
28+
const smallerDuration = testDurationsMS[index];
29+
const longerDuration = testDurationsMS[index + 1];
30+
31+
// The longer test needs to be a little longer and also not 10x more than the
32+
// shorter test. This is not perfect... there is some fluctuation.
33+
// The idea here is to catch a regression that makes `pMapSkip` handling O(n^2)
34+
// on the number of `pMapSkip` items in the input.
35+
t.true(inRange(longerDuration, {start: 1.2 * smallerDuration, end: 15 * smallerDuration}));
36+
}
37+
});

test.js

+44
Original file line numberDiff line numberDiff line change
@@ -155,6 +155,28 @@ test('pMapSkip', async t => {
155155
], async value => value), [1, 2]);
156156
});
157157

158+
test('multiple pMapSkips', async t => {
159+
t.deepEqual(await pMap([
160+
1,
161+
pMapSkip,
162+
2,
163+
pMapSkip,
164+
3,
165+
pMapSkip,
166+
pMapSkip,
167+
4
168+
], async value => value), [1, 2, 3, 4]);
169+
});
170+
171+
test('all pMapSkips', async t => {
172+
t.deepEqual(await pMap([
173+
pMapSkip,
174+
pMapSkip,
175+
pMapSkip,
176+
pMapSkip
177+
], async value => value), []);
178+
});
179+
158180
test('all mappers should run when concurrency is infinite, even after stop-on-error happened', async t => {
159181
const input = [1, async () => delay(300, {value: 2}), 3];
160182
const mappedValues = [];
@@ -269,6 +291,28 @@ test('asyncIterator - pMapSkip', async t => {
269291
]), async value => value), [1, 2]);
270292
});
271293

294+
test('asyncIterator - multiple pMapSkips', async t => {
295+
t.deepEqual(await pMap(new AsyncTestData([
296+
1,
297+
pMapSkip,
298+
2,
299+
pMapSkip,
300+
3,
301+
pMapSkip,
302+
pMapSkip,
303+
4
304+
]), async value => value), [1, 2, 3, 4]);
305+
});
306+
307+
test('asyncIterator - all pMapSkips', async t => {
308+
t.deepEqual(await pMap(new AsyncTestData([
309+
pMapSkip,
310+
pMapSkip,
311+
pMapSkip,
312+
pMapSkip
313+
]), async value => value), []);
314+
});
315+
272316
test('asyncIterator - all mappers should run when concurrency is infinite, even after stop-on-error happened', async t => {
273317
const input = [1, async () => delay(300, {value: 2}), 3];
274318
const mappedValues = [];

0 commit comments

Comments
 (0)