Skip to content

Commit d09c989

Browse files
committed
feat: promise queue implementation
1 parent 420d258 commit d09c989

File tree

7 files changed

+340
-76
lines changed

7 files changed

+340
-76
lines changed

babel.config.js

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,5 +15,6 @@ module.exports = {
1515
plugins: [
1616
"@babel/plugin-transform-async-generator-functions",
1717
"@babel/plugin-transform-class-properties",
18+
"@babel/plugin-transform-private-methods",
1819
],
1920
};

package-lock.json

Lines changed: 0 additions & 10 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

package.json

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,6 @@
2424
"level": "8.0.1",
2525
"lodash": "4.17.21",
2626
"long": "5.2.3",
27-
"queue-promise": "^2.2.1",
2827
"ws": "8.17.1"
2928
},
3029
"scripts": {

src/models/priority_queue.ts

Lines changed: 179 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,179 @@
1+
/* eslint class-methods-use-this: ["error", { "exceptMethods": ["parent", "left", "right", "comparator"] }] */
2+
3+
interface PriorityQueueNode<T> {
4+
value: T;
5+
priority: number;
6+
}
7+
8+
/**
9+
* Priority queue implementation using an underlying heap for performance.
10+
* @example
11+
* const prioQ = new PriorityQueue<string>();
12+
* prioQ.push(prioQ.makeNode(0, 'lower prio'));
13+
* prioQ.push(prioQ.makeNode(10, 'higher prio'));
14+
* prioQ.push(prioQ.makeNode(5, 'medium prio'));
15+
* // Returns 'higher prio'
16+
* prioQ.pop()
17+
* // Returns 'medium prio'
18+
* prioQ.pop()
19+
* // Returns 'lower prio'
20+
* prioQ.pop()
21+
*
22+
* @template [T=unknown]
23+
*/
24+
export default class PriorityQueue<T = unknown> {
25+
#heap: PriorityQueueNode<T>[];
26+
27+
readonly #top: number;
28+
29+
constructor() {
30+
this.#heap = [];
31+
this.#top = 0;
32+
}
33+
34+
/**
35+
* Utility to create a node from a value and priority.
36+
* @example
37+
* // returns { value: 'foobar', 'priority': 10 }
38+
* prioQueue.makeNode(10, 'foobar');
39+
*/
40+
static makeNode<N>(priority: number, value: N): PriorityQueueNode<N> {
41+
return { value, priority };
42+
}
43+
44+
/**
45+
* Get the number of elements on the priority queue.
46+
*/
47+
public get size() {
48+
return this.#heap.length;
49+
}
50+
51+
/**
52+
* Check if the priority queue is empty
53+
*/
54+
public isEmpty() {
55+
return this.size === 0;
56+
}
57+
58+
/**
59+
* Get the top value of the queue without removing it from the queue.
60+
*/
61+
public peek() {
62+
if (this.size === 0) {
63+
// Heap is empty
64+
return undefined;
65+
}
66+
return this.#heap[this.#top].value;
67+
}
68+
69+
/**
70+
* Add a node to the priority queue and maintain the priority order.
71+
*/
72+
public push(value: PriorityQueueNode<T>) {
73+
this.#heap.push(value);
74+
this._siftUp();
75+
}
76+
77+
/**
78+
* Add multiple values to the priority queue while maintaining the priority order.
79+
*/
80+
public add(...values: PriorityQueueNode<T>[]) {
81+
values.forEach(value => {
82+
this.push(value);
83+
});
84+
return this.size;
85+
}
86+
87+
/**
88+
* Get the node with highest priority and remove it from the priority queue.
89+
*/
90+
public pop() {
91+
const poppedValue = this.peek();
92+
// Check if poppedValue is null or undefined, meaning the queue is empty
93+
if (!(poppedValue ?? false)) {
94+
return undefined;
95+
}
96+
const bottom = this.size - 1;
97+
if (bottom > this.#top) {
98+
this._swap(this.#top, bottom);
99+
}
100+
this.#heap.pop();
101+
this._siftDown();
102+
return poppedValue;
103+
}
104+
105+
/** Compare 2 nodes and return true if the left one has higher priority. */
106+
private comparator(a: PriorityQueueNode<T>, b: PriorityQueueNode<T>): boolean {
107+
return a.priority > b.priority;
108+
}
109+
110+
/** Given a node index on the heap get the parent index */
111+
private parent(i: number) {
112+
return ((i + 1) >>> 1) - 1;
113+
}
114+
115+
/** Given a node index on the heap get the left child index */
116+
private left(i: number) {
117+
return (i << 1) + 1;
118+
}
119+
120+
/** Given a node index on the heap get the right child index */
121+
private right(i: number) {
122+
return (i + 1) << 1;
123+
}
124+
125+
/**
126+
* Compare the nodes at index `i`, `j` on the heap
127+
* Return true if the node at `i` is higher priority than the node at `j`
128+
* Return false otherwise.
129+
*/
130+
private _greater(i: number, j: number) {
131+
return this.comparator(this.#heap[i], this.#heap[j]);
132+
}
133+
134+
/** swap the nodes at index `i` and `j` on the heap */
135+
private _swap(i: number, j: number) {
136+
[this.#heap[i], this.#heap[j]] = [this.#heap[j], this.#heap[i]];
137+
}
138+
139+
/**
140+
* The last node of the heap will work its way up the heap until it meets a node
141+
* of higher priority or reaches the top of the heap.
142+
*/
143+
private _siftUp() {
144+
// Start from the last index and work our way up the heap
145+
let node = this.size - 1;
146+
// While the current node is not at the top and the priority is greater
147+
// than the parent we continue sifting up
148+
while (node > this.#top && this._greater(node, this.parent(node))) {
149+
// parent is lower priority, swap with child (current node)
150+
this._swap(node, this.parent(node));
151+
// Start from the parent
152+
node = this.parent(node);
153+
}
154+
}
155+
156+
/**
157+
* The top node of the heap will work its way down until no child is of higher
158+
* priority or it reaches the bottom of the heap.
159+
*/
160+
private _siftDown() {
161+
// Start from the top index and work our way down the heap
162+
let node = this.#top;
163+
// If a child is in the heap and has higher prioroty, swap with parent and go down
164+
while (
165+
(this.left(node) < this.size && this._greater(this.left(node), node)) ||
166+
(this.right(node) < this.size && this._greater(this.right(node), node))
167+
) {
168+
// Get the child with higher priority
169+
const maxChild =
170+
this.right(node) < this.size && this._greater(this.right(node), this.left(node))
171+
? this.right(node)
172+
: this.left(node);
173+
// Swap with parent and continue sifting down from the child
174+
this._swap(node, maxChild);
175+
// Start from the child
176+
node = maxChild;
177+
}
178+
}
179+
}

src/models/promise_queue.ts

Lines changed: 151 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,151 @@
1+
import EventEmitter from 'events';
2+
import PriorityQueue from './priority_queue';
3+
4+
type TaskOptions = { signal?: AbortSignal; };
5+
6+
type Task<TaskResultType> =
7+
| ((options?: TaskOptions) => PromiseLike<TaskResultType>)
8+
| ((options?: TaskOptions) => TaskResultType);
9+
10+
type AddTaskOptions = {
11+
priority?: number;
12+
signal?: AbortSignal;
13+
}
14+
15+
export default class PromiseQueue extends EventEmitter {
16+
#queue: PriorityQueue<() => Promise<void>>;
17+
18+
// Number of active jobs currently running.
19+
#jobsRunning: number;
20+
21+
#allowedConcurrentJobs: number = 1;
22+
23+
#isPaused = false;
24+
25+
#intervalId: ReturnType<typeof setInterval>|null = null;
26+
27+
constructor() {
28+
super();
29+
this.#queue = new PriorityQueue<() => Promise<void>>();
30+
this.#jobsRunning = 0;
31+
this.#startInterval();
32+
}
33+
34+
#startInterval() {
35+
this.#clearInterval();
36+
this.#intervalId = setInterval(() => {
37+
this.processQueue();
38+
}, 1000);
39+
}
40+
41+
#clearInterval() {
42+
if (this.#intervalId) {
43+
clearInterval(this.#intervalId);
44+
this.#intervalId = null;
45+
}
46+
}
47+
48+
get isPaused() {
49+
return this.#isPaused;
50+
}
51+
52+
stop() {
53+
this.#isPaused = true;
54+
this.#clearInterval();
55+
}
56+
57+
continue() {
58+
this.#isPaused = false;
59+
this.processQueue();
60+
this.#startInterval();
61+
}
62+
63+
#next() {
64+
this.emit('next')
65+
this.#jobsRunning--;
66+
this.#tryToStartNextJob();
67+
}
68+
69+
get #canStartJob() {
70+
return this.#jobsRunning < this.concurrent;
71+
}
72+
73+
get concurrent() {
74+
return this.#allowedConcurrentJobs;
75+
}
76+
77+
set concurrent(value) {
78+
if (value < 1) {
79+
throw new Error('Cannot have less than 1 job running.');
80+
}
81+
this.#allowedConcurrentJobs = value;
82+
}
83+
84+
#tryToStartNextJob(): boolean {
85+
if (this.#queue.isEmpty()) {
86+
// No more tasks to run
87+
this.emit('queue_empty');
88+
if (this.#jobsRunning === 0) {
89+
this.emit('idle');
90+
}
91+
return false;
92+
}
93+
94+
if (this.#canStartJob) {
95+
const job = this.#queue.pop();
96+
if (!job) {
97+
// Should never happen, but treating for typing
98+
return false;
99+
}
100+
this.emit('job_start');
101+
job();
102+
return true;
103+
}
104+
105+
return false;
106+
}
107+
108+
static async throwOnAbort(signal: AbortSignal): Promise<never> {
109+
return new Promise((_resolve, reject) => {
110+
signal.addEventListener('abort', () => {
111+
reject(signal.reason);
112+
}, {once: true});
113+
});
114+
}
115+
116+
/**
117+
* Try to start jobs until the concurrency limit is reached.
118+
*/
119+
processQueue(): void {
120+
while (this.#tryToStartNextJob()) {}
121+
}
122+
123+
async add<TaskResultType>(task: Task<TaskResultType>, options?: AddTaskOptions) {
124+
this.emit('new_job');
125+
return new Promise((resolve, reject) => {
126+
this.#queue.push(PriorityQueue.makeNode(options?.priority || 0, async () => {
127+
this.#jobsRunning++;
128+
try {
129+
// Throw if the operation was aborted and don't run.
130+
options?.signal?.throwIfAborted();
131+
132+
// Run the task until completion or until the task aborts.
133+
let operation = task({signal: options?.signal});
134+
// If a signal was passed, we may abort the operation from the outside.
135+
// This does not abort internally the task, it has to also manage the abort signal.
136+
if (options?.signal) {
137+
operation = Promise.race([operation, PromiseQueue.throwOnAbort(options.signal)]);
138+
}
139+
const result = await operation;
140+
resolve(result);
141+
// Completed task
142+
} catch (error: unknown) {
143+
reject(error);
144+
} finally {
145+
this.emit('finished_job');
146+
this.#next();
147+
}
148+
}));
149+
});
150+
}
151+
}

src/new/wallet.js

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -49,7 +49,7 @@ import { MemoryStore, Storage } from '../storage';
4949
import { deriveAddressP2PKH, deriveAddressP2SH, getAddressFromPubkey } from '../utils/address';
5050
import NanoContractTransactionBuilder from '../nano_contracts/builder';
5151
import { prepareNanoSendTransaction } from '../nano_contracts/utils';
52-
import { addTask } from '../sync/gll';
52+
import GLL from '../sync/gll';
5353

5454
const ERROR_MESSAGE_PIN_REQUIRED = 'Pin is required.';
5555

@@ -2859,9 +2859,11 @@ class HathorWallet extends EventEmitter {
28592859
throw new Error('Trying to use an unsupported sync method for this wallet.');
28602860
}
28612861
const syncMethod = getHistorySyncMethod(this.historySyncMode);
2862-
await addTask(async () => {
2862+
// This will add the task to the GLL queue and return a promise that
2863+
// resolves when the task finishes executing
2864+
await GLL.add(async () => {
28632865
await syncMethod(startIndex, count, this.storage, this.conn, shouldProcessHistory);
2864-
}, this.logger);
2866+
});
28652867
}
28662868

28672869
/**

0 commit comments

Comments
 (0)