Skip to content

Commit d5a0b26

Browse files
committed
Prevent skip removal logic from being run twice
1 parent 8477a7a commit d5a0b26

File tree

2 files changed

+40
-20
lines changed

2 files changed

+40
-20
lines changed

index.js

+15-14
Original file line numberDiff line numberDiff line change
@@ -21,42 +21,43 @@ export default async function pMap(
2121
const errors = [];
2222
const skippedIndexes = [];
2323
let isRejected = false;
24+
let isRejectedOrResolved = false;
2425
let isIterableDone = false;
2526
let resolvingCount = 0;
2627
let currentIndex = 0;
27-
let asyncIterator = false;
28-
let iterator;
29-
30-
if (iterable[Symbol.iterator] === undefined) {
31-
// We've got an async iterable
32-
iterator = iterable[Symbol.asyncIterator]();
33-
asyncIterator = true;
34-
} else {
35-
iterator = iterable[Symbol.iterator]();
36-
}
28+
const iterator = iterable[Symbol.iterator] === undefined ? iterable[Symbol.asyncIterator]() : iterable[Symbol.iterator]();
3729

3830
const reject = reason => {
3931
isRejected = true;
32+
isRejectedOrResolved = true;
4033
reject_(reason);
4134
};
4235

4336
const next = async () => {
44-
if (isRejected) {
37+
if (isRejectedOrResolved) {
4538
return;
4639
}
4740

48-
const nextItem = asyncIterator ? await iterator.next() : iterator.next();
41+
const nextItem = await iterator.next();
4942

5043
const index = currentIndex;
5144
currentIndex++;
5245

46+
// Note: iterator.next() can be called many times in parallel.
47+
// This can cause multiple calls to this next() function to
48+
// receive a `nextItem` with `done === true`.
49+
// The shutdown logic that rejects/resolves must be protected
50+
// so it runs only one time as the `skippedIndex` logic is
51+
// non-idempotent.
5352
if (nextItem.done) {
5453
isIterableDone = true;
5554

56-
if (resolvingCount === 0) {
55+
if (resolvingCount === 0 && !isRejectedOrResolved) {
5756
if (!stopOnError && errors.length > 0) {
5857
reject(new AggregateError(errors));
5958
} else {
59+
isRejectedOrResolved = true;
60+
6061
for (const skippedIndex of skippedIndexes) {
6162
result.splice(skippedIndex, 1);
6263
}
@@ -75,7 +76,7 @@ export default async function pMap(
7576
try {
7677
const element = await nextItem.value;
7778

78-
if (isRejected) {
79+
if (isRejectedOrResolved) {
7980
return;
8081
}
8182

test.js

+25-6
Original file line numberDiff line numberDiff line change
@@ -157,7 +157,7 @@ test('pMapSkip', async t => {
157157
], async value => value), [1, 2]);
158158
});
159159

160-
test('do not run mapping after stop-on-error happened', async t => {
160+
test('all mappers should run when concurrency is infinite, even after stop-on-error happened', async t => {
161161
const input = [1, async () => delay(300, {value: 2}), 3];
162162
const mappedValues = [];
163163
await t.throwsAsync(
@@ -271,7 +271,7 @@ test('asyncIterator - pMapSkip', async t => {
271271
]), async value => value), [1, 2]);
272272
});
273273

274-
test('asyncIterator - do not run mapping after stop-on-error happened', async t => {
274+
test('asyncIterator - all mappers should run when concurrency is infinite, even after stop-on-error happened', async t => {
275275
const input = [1, async () => delay(300, {value: 2}), 3];
276276
const mappedValues = [];
277277
await t.throwsAsync(
@@ -283,13 +283,13 @@ test('asyncIterator - do not run mapping after stop-on-error happened', async t
283283
mappedValues.push(value);
284284
if (value === 1) {
285285
await delay(100);
286-
throw new Error('Oops!');
286+
throw new Error(`Oops! ${value}`);
287287
}
288-
},
289-
{concurrency: 1})
288+
}),
289+
{message: 'Oops! 1'}
290290
);
291291
await delay(500);
292-
t.deepEqual(mappedValues, [1]);
292+
t.deepEqual(mappedValues, [1, 3, 2]);
293293
});
294294

295295
test('catches exception from source iterator - 1st item', async t => {
@@ -349,3 +349,22 @@ test('catches exception from source iterator - 2nd item after 1st item mapper th
349349
t.is(input.index, 2);
350350
t.deepEqual(mappedValues, [0]);
351351
});
352+
353+
test('asyncIterator - get the correct exception after stop-on-error', async t => {
354+
const input = [1, async () => delay(200, {value: 2}), async () => delay(300, {value: 3})];
355+
const mappedValues = [];
356+
357+
const task = pMap(new AsyncTestData(input), async value => {
358+
if (typeof value === 'function') {
359+
value = await value();
360+
}
361+
362+
mappedValues.push(value);
363+
// Throw for each item - all should fail and we should get only the first
364+
await delay(100);
365+
throw new Error(`Oops! ${value}`);
366+
});
367+
await delay(500);
368+
await t.throwsAsync(task, {message: 'Oops! 1'});
369+
t.deepEqual(mappedValues, [1, 2, 3]);
370+
});

0 commit comments

Comments
 (0)