Skip to content

Commit 063715f

Browse files
r4mmeralexruzenhackandreabadesso
authored
feat: promise queue implementation (#771)
* feat: promise queue implementation * chore: linter changes * tests: promise and priority queue tests * chore: linter changes * Apply suggestions from code review Co-authored-by: Alex Ruzenhack <[email protected]> * chore: docstrings * chore: review requests * Update src/models/promise_queue.ts Co-authored-by: André Abadesso <[email protected]> * chore: change pop empty check --------- Co-authored-by: Alex Ruzenhack <[email protected]> Co-authored-by: André Abadesso <[email protected]>
1 parent fab4f26 commit 063715f

File tree

9 files changed

+530
-76
lines changed

9 files changed

+530
-76
lines changed
Lines changed: 46 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,46 @@
1+
/**
2+
* Copyright (c) Hathor Labs and its affiliates.
3+
*
4+
* This source code is licensed under the MIT license found in the
5+
* LICENSE file in the root directory of this source tree.
6+
*/
7+
8+
import PriorityQueue from '../../src/models/priority_queue';
9+
10+
test('PriorityQueue operations', () => {
11+
const q = new PriorityQueue<string>();
12+
expect(q.size).toEqual(0);
13+
// method: makeNode
14+
expect(PriorityQueue.makeNode(123, 'foobar')).toMatchObject({
15+
priority: 123,
16+
value: 'foobar',
17+
});
18+
19+
// methods: push and pop
20+
q.push(PriorityQueue.makeNode(10, 'foobar1'));
21+
expect(q.size).toEqual(1);
22+
expect(q.isEmpty()).toBeFalsy();
23+
expect(q.pop()).toEqual('foobar1');
24+
expect(q.isEmpty()).toBeTruthy();
25+
26+
// method: add
27+
const elements = [
28+
{ priority: 10, value: 'foo' },
29+
{ priority: 1, value: 'baz' },
30+
{ priority: 5, value: 'bar' },
31+
].map(el => PriorityQueue.makeNode(el.priority, el.value));
32+
q.add(...elements);
33+
expect(q.size).toEqual(3);
34+
expect(q.isEmpty()).toBeFalsy();
35+
expect(q.pop()).toEqual('foo');
36+
expect(q.size).toEqual(2);
37+
expect(q.isEmpty()).toBeFalsy();
38+
expect(q.peek()).toEqual('bar');
39+
expect(q.pop()).toEqual('bar');
40+
expect(q.size).toEqual(1);
41+
expect(q.isEmpty()).toBeFalsy();
42+
expect(q.peek()).toEqual('baz');
43+
expect(q.pop()).toEqual('baz');
44+
expect(q.size).toEqual(0);
45+
expect(q.isEmpty()).toBeTruthy();
46+
});
Lines changed: 55 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,55 @@
1+
/**
2+
* Copyright (c) Hathor Labs and its affiliates.
3+
*
4+
* This source code is licensed under the MIT license found in the
5+
* LICENSE file in the root directory of this source tree.
6+
*/
7+
8+
import PromiseQueue from '../../src/models/promise_queue';
9+
10+
describe('PromiseQueue operations', () => {
11+
it('should start unpaused and be pausable', () => {
12+
const q = new PromiseQueue();
13+
expect(q.isPaused).toBeFalsy();
14+
q.stop();
15+
expect(q.isPaused).toBeTruthy();
16+
q.continue();
17+
expect(q.isPaused).toBeFalsy();
18+
});
19+
20+
it('should be able to set the concurrency', async () => {
21+
const q = new PromiseQueue();
22+
expect(q.concurrent).toEqual(1);
23+
q.concurrent = 3;
24+
expect(q.concurrent).toEqual(3);
25+
26+
// Check that the concurrent actually runs only 2 tasks
27+
q.concurrent = 2;
28+
29+
const task = async () => {
30+
return new Promise<void>(resolve => {
31+
setTimeout(resolve, 2000);
32+
});
33+
};
34+
q.add(task);
35+
q.add(task);
36+
q.add(task);
37+
// wait 50ms so jobs can start
38+
await new Promise<void>(resolve => {
39+
setTimeout(resolve, 50);
40+
});
41+
// Expect that only 2 jobs are running
42+
expect(q.jobsRunning).toEqual(2);
43+
44+
// Wait for either all tasks to finish or a timeout.
45+
await Promise.race([
46+
new Promise<void>(resolve => {
47+
// idle is sent when the queue is empty and there are no events running
48+
q.on('idle', resolve);
49+
}),
50+
new Promise<void>((_, reject) => {
51+
setTimeout(reject, 5000);
52+
}),
53+
]);
54+
});
55+
});

babel.config.js

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,5 +15,6 @@ module.exports = {
1515
plugins: [
1616
"@babel/plugin-transform-async-generator-functions",
1717
"@babel/plugin-transform-class-properties",
18+
"@babel/plugin-transform-private-methods",
1819
],
1920
};

package-lock.json

Lines changed: 0 additions & 10 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

package.json

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,6 @@
2424
"level": "8.0.1",
2525
"lodash": "4.17.21",
2626
"long": "5.2.3",
27-
"queue-promise": "^2.2.1",
2827
"ws": "8.17.1"
2928
},
3029
"scripts": {

src/models/priority_queue.ts

Lines changed: 186 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,186 @@
1+
/**
2+
* Copyright (c) Hathor Labs and its affiliates.
3+
*
4+
* This source code is licensed under the MIT license found in the
5+
* LICENSE file in the root directory of this source tree.
6+
*/
7+
/* eslint class-methods-use-this: ["error", { "exceptMethods": ["parent", "left", "right", "comparator"] }] */
8+
9+
interface PriorityQueueNode<T> {
10+
value: T;
11+
priority: number;
12+
}
13+
14+
/**
15+
* Priority queue implementation using an underlying heap for performance.
16+
* @example
17+
* const prioQ = new PriorityQueue<string>();
18+
* prioQ.push(prioQ.makeNode(0, 'lower prio'));
19+
* prioQ.push(prioQ.makeNode(10, 'higher prio'));
20+
* prioQ.push(prioQ.makeNode(5, 'medium prio'));
21+
* // Returns 'higher prio'
22+
* prioQ.pop()
23+
* // Returns 'medium prio'
24+
* prioQ.pop()
25+
* // Returns 'lower prio'
26+
* prioQ.pop()
27+
*
28+
* @template [T=unknown]
29+
*/
30+
export default class PriorityQueue<T = unknown> {
31+
#heap: PriorityQueueNode<T>[];
32+
33+
readonly #top: number;
34+
35+
constructor() {
36+
this.#heap = [];
37+
this.#top = 0;
38+
}
39+
40+
/**
41+
* Utility to create a node from a value and priority.
42+
* @example
43+
* // returns { value: 'foobar', 'priority': 10 }
44+
* prioQueue.makeNode(10, 'foobar');
45+
*/
46+
static makeNode<N>(priority: number, value: N): PriorityQueueNode<N> {
47+
return { value, priority };
48+
}
49+
50+
/**
51+
* Get the number of elements on the priority queue.
52+
*/
53+
public get size() {
54+
return this.#heap.length;
55+
}
56+
57+
/**
58+
* Check if the priority queue is empty
59+
*/
60+
public isEmpty() {
61+
return this.size === 0;
62+
}
63+
64+
/**
65+
* Get the top value of the queue without removing it from the queue.
66+
*/
67+
public peek() {
68+
if (this.size === 0) {
69+
// Heap is empty
70+
return undefined;
71+
}
72+
return this.#heap[this.#top].value;
73+
}
74+
75+
/**
76+
* Add a node to the priority queue and maintain the priority order.
77+
*/
78+
public push(value: PriorityQueueNode<T>) {
79+
this.#heap.push(value);
80+
this._siftUp();
81+
}
82+
83+
/**
84+
* Add multiple values to the priority queue while maintaining the priority order.
85+
*/
86+
public add(...nodes: PriorityQueueNode<T>[]) {
87+
nodes.forEach(node => {
88+
this.push(node);
89+
});
90+
return this.size;
91+
}
92+
93+
/**
94+
* Get the node with highest priority and remove it from the priority queue.
95+
*/
96+
public pop() {
97+
if (this.isEmpty()) {
98+
// Queue is empty
99+
return undefined;
100+
}
101+
// This may be undefined if the queue is empty, but we already checked for that.
102+
const poppedValue = this.peek();
103+
const bottom = this.size - 1;
104+
if (bottom > this.#top) {
105+
this._swap(this.#top, bottom);
106+
}
107+
this.#heap.pop();
108+
this._siftDown();
109+
return poppedValue;
110+
}
111+
112+
/** Compare 2 nodes and return true if the left one has higher priority. */
113+
private comparator(a: PriorityQueueNode<T>, b: PriorityQueueNode<T>): boolean {
114+
return a.priority > b.priority;
115+
}
116+
117+
/** Given a node index on the heap get the parent index */
118+
private parent(i: number) {
119+
return ((i + 1) >>> 1) - 1;
120+
}
121+
122+
/** Given a node index on the heap get the left child index */
123+
private left(i: number) {
124+
return (i << 1) + 1;
125+
}
126+
127+
/** Given a node index on the heap get the right child index */
128+
private right(i: number) {
129+
return (i + 1) << 1;
130+
}
131+
132+
/**
133+
* Compare the nodes at index `i`, `j` on the heap
134+
* Return true if the node at `i` is higher priority than the node at `j`
135+
* Return false otherwise.
136+
*/
137+
private _greater(i: number, j: number) {
138+
return this.comparator(this.#heap[i], this.#heap[j]);
139+
}
140+
141+
/** swap the nodes at index `i` and `j` on the heap */
142+
private _swap(i: number, j: number) {
143+
[this.#heap[i], this.#heap[j]] = [this.#heap[j], this.#heap[i]];
144+
}
145+
146+
/**
147+
* The last node of the heap will work its way up the heap until it meets a node
148+
* of higher priority or reaches the top of the heap.
149+
*/
150+
private _siftUp() {
151+
// Start from the last index and work our way up the heap
152+
let node = this.size - 1;
153+
// While the current node is not at the top and the priority is greater
154+
// than the parent we continue sifting up
155+
while (node > this.#top && this._greater(node, this.parent(node))) {
156+
// parent is lower priority, swap with child (current node)
157+
this._swap(node, this.parent(node));
158+
// Start from the parent
159+
node = this.parent(node);
160+
}
161+
}
162+
163+
/**
164+
* The top node of the heap will work its way down until no child is of higher
165+
* priority or it reaches the bottom of the heap.
166+
*/
167+
private _siftDown() {
168+
// Start from the top index and work our way down the heap
169+
let node = this.#top;
170+
// If a child is in the heap and has higher priority, swap with parent and go down
171+
while (
172+
(this.left(node) < this.size && this._greater(this.left(node), node)) ||
173+
(this.right(node) < this.size && this._greater(this.right(node), node))
174+
) {
175+
// Get the child with higher priority
176+
const maxChild =
177+
this.right(node) < this.size && this._greater(this.right(node), this.left(node))
178+
? this.right(node)
179+
: this.left(node);
180+
// Swap with parent and continue sifting down from the child
181+
this._swap(node, maxChild);
182+
// Start from the child
183+
node = maxChild;
184+
}
185+
}
186+
}

0 commit comments

Comments
 (0)