Skip to content

Commit 7da4f61

Browse files
committed
tests: promise and priority queue tests
1 parent 6405819 commit 7da4f61

File tree

4 files changed

+136
-10
lines changed

4 files changed

+136
-10
lines changed
Lines changed: 47 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,47 @@
1+
/**
2+
* Copyright (c) Hathor Labs and its affiliates.
3+
*
4+
* This source code is licensed under the MIT license found in the
5+
* LICENSE file in the root directory of this source tree.
6+
*/
7+
8+
import PriorityQueue from '../../src/models/priority_queue';
9+
10+
test('PriorityQueue operations', () => {
11+
const q = new PriorityQueue<string>();
12+
expect(q.size).toEqual(0);
13+
// method: makeNode
14+
expect(PriorityQueue.makeNode(123, 'foobar')).toMatchObject({
15+
priority: 123,
16+
value: 'foobar',
17+
});
18+
19+
// methods: push and pop
20+
q.push(PriorityQueue.makeNode(10, 'foobar1'));
21+
expect(q.size).toEqual(1);
22+
expect(q.isEmpty()).toBeFalsy();
23+
expect(q.pop()).toEqual('foobar1');
24+
expect(q.isEmpty()).toBeTruthy();
25+
26+
// method: add
27+
const elements = [
28+
{ priority: 10, value: 'foo' },
29+
{ priority: 1, value: 'baz' },
30+
{ priority: 5, value: 'bar' },
31+
].map(el => PriorityQueue.makeNode(el.priority, el.value));
32+
q.add(...elements);
33+
expect(q.size).toEqual(3);
34+
expect(q.isEmpty()).toBeFalsy();
35+
expect(q.pop()).toEqual('foo');
36+
expect(q.size).toEqual(2);
37+
expect(q.isEmpty()).toBeFalsy();
38+
expect(q.peek()).toEqual('bar');
39+
expect(q.pop()).toEqual('bar');
40+
expect(q.size).toEqual(1);
41+
expect(q.isEmpty()).toBeFalsy();
42+
expect(q.peek()).toEqual('baz');
43+
expect(q.pop()).toEqual('baz');
44+
expect(q.size).toEqual(0);
45+
expect(q.isEmpty()).toBeTruthy();
46+
});
47+
Lines changed: 55 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,55 @@
1+
/**
2+
* Copyright (c) Hathor Labs and its affiliates.
3+
*
4+
* This source code is licensed under the MIT license found in the
5+
* LICENSE file in the root directory of this source tree.
6+
*/
7+
8+
import PromiseQueue from '../../src/models/promise_queue';
9+
10+
describe('PromiseQueue operations', () => {
11+
it('should start unpaused and be pausable', () => {
12+
const q = new PromiseQueue();
13+
expect(q.isPaused).toBeFalsy();
14+
q.stop();
15+
expect(q.isPaused).toBeTruthy();
16+
q.continue();
17+
expect(q.isPaused).toBeFalsy();
18+
});
19+
20+
it('should be able to set the concurrency', async () => {
21+
const q = new PromiseQueue();
22+
expect(q.concurrent).toEqual(1);
23+
q.concurrent = 3;
24+
expect(q.concurrent).toEqual(3);
25+
26+
// Check that the concurrent actually runs only 2 tasks
27+
q.concurrent = 2;
28+
29+
const task = async () => {
30+
return new Promise<void>(resolve => {
31+
setTimeout(resolve, 2000);
32+
});
33+
};
34+
q.add(task);
35+
q.add(task);
36+
q.add(task);
37+
// wait 50ms so jobs can start
38+
await new Promise<void>(resolve => {
39+
setTimeout(resolve, 50);
40+
});
41+
// Expect that only 2 jobs are running
42+
expect(q.jobsRunning).toEqual(2);
43+
44+
// Wait for either all tasks to finish or a timeout.
45+
await Promise.race([
46+
new Promise<void>(resolve => {
47+
// idle is sent when the queue is empty and there are no events running
48+
q.on('idle', resolve);
49+
}),
50+
new Promise<void>((_, reject) => {
51+
setTimeout(reject, 5000);
52+
}),
53+
]);
54+
});
55+
});

src/models/priority_queue.ts

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,9 @@
1+
/**
2+
* Copyright (c) Hathor Labs and its affiliates.
3+
*
4+
* This source code is licensed under the MIT license found in the
5+
* LICENSE file in the root directory of this source tree.
6+
*/
17
/* eslint class-methods-use-this: ["error", { "exceptMethods": ["parent", "left", "right", "comparator"] }] */
28

39
interface PriorityQueueNode<T> {

src/models/promise_queue.ts

Lines changed: 28 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,9 @@
1+
/**
2+
* Copyright (c) Hathor Labs and its affiliates.
3+
*
4+
* This source code is licensed under the MIT license found in the
5+
* LICENSE file in the root directory of this source tree.
6+
*/
17
import EventEmitter from 'events';
28
import PriorityQueue from './priority_queue';
39

@@ -22,7 +28,7 @@ export default class PromiseQueue extends EventEmitter {
2228

2329
#isPaused = false;
2430

25-
#intervalId: ReturnType<typeof setInterval> | null = null;
31+
#intervalId: ReturnType<typeof setTimeout> | null = null;
2632

2733
constructor() {
2834
super();
@@ -32,9 +38,13 @@ export default class PromiseQueue extends EventEmitter {
3238
}
3339

3440
#startInterval() {
35-
this.#clearInterval();
36-
this.#intervalId = setInterval(() => {
37-
this.processQueue();
41+
this.processQueue();
42+
this.#intervalId = setTimeout(() => {
43+
if (this.#isPaused) {
44+
this.#clearInterval();
45+
return;
46+
}
47+
this.#startInterval();
3848
}, 1000);
3949
}
4050

@@ -45,21 +55,24 @@ export default class PromiseQueue extends EventEmitter {
4555
}
4656
}
4757

48-
get isPaused() {
58+
public get isPaused() {
4959
return this.#isPaused;
5060
}
5161

52-
stop() {
62+
public stop() {
5363
this.#isPaused = true;
5464
this.#clearInterval();
5565
}
5666

57-
continue() {
67+
public continue() {
5868
this.#isPaused = false;
59-
this.processQueue();
6069
this.#startInterval();
6170
}
6271

72+
public get jobsRunning() {
73+
return this.#jobsRunning;
74+
}
75+
6376
#next() {
6477
this.emit('next');
6578
this.#jobsRunning--;
@@ -70,18 +83,21 @@ export default class PromiseQueue extends EventEmitter {
7083
return this.#jobsRunning < this.concurrent;
7184
}
7285

73-
get concurrent() {
86+
public get concurrent() {
7487
return this.#allowedConcurrentJobs;
7588
}
7689

77-
set concurrent(value) {
90+
public set concurrent(value) {
7891
if (value < 1) {
7992
throw new Error('Cannot have less than 1 job running.');
8093
}
8194
this.#allowedConcurrentJobs = value;
8295
}
8396

8497
#tryToStartNextJob(): boolean {
98+
if (this.#isPaused) {
99+
return false;
100+
}
85101
if (this.#queue.isEmpty()) {
86102
// No more tasks to run
87103
this.emit('queue_empty');
@@ -153,6 +169,8 @@ export default class PromiseQueue extends EventEmitter {
153169
}
154170
})
155171
);
172+
// Try to start the job we enqueued and any other we can start
173+
this.processQueue();
156174
});
157175
}
158176
}

0 commit comments

Comments
 (0)