@@ -18,6 +18,19 @@ type AddTaskOptions = {
18
18
signal ?: AbortSignal ;
19
19
} ;
20
20
21
+ /**
22
+ * PromiseQueue is a task executor that allows a certain number of concurrent
23
+ * tasks to run at any time.
24
+ * When a task is added it returns a promise that resolves when the underlying task resolves.
25
+ *
26
+ * PromiseQueue is also an EventEmitter for the events:
27
+ * - new_job: When a new job is added to the queue.
28
+ * - finished_job: When a job is finished.
29
+ * - next: When we try to start the next job.
30
+ * - queue_empty: When the last job on queue start running.
31
+ * - idle: When all jobs are done and there is no more jobs to run.
32
+ * - job_start: When we start running a new job.
33
+ */
21
34
export default class PromiseQueue extends EventEmitter {
22
35
#queue: PriorityQueue < ( ) => Promise < void > > ;
23
36
@@ -37,6 +50,10 @@ export default class PromiseQueue extends EventEmitter {
37
50
this . #startInterval( ) ;
38
51
}
39
52
53
+ /**
54
+ * Try to start any jobs we can and set a timeout for the next interval.
55
+ * Will stop the interval if the PromiseQueue is paused.
56
+ */
40
57
#startInterval( ) {
41
58
this . processQueue ( ) ;
42
59
this . #intervalId = setTimeout ( ( ) => {
@@ -48,52 +65,83 @@ export default class PromiseQueue extends EventEmitter {
48
65
} , 1000 ) ;
49
66
}
50
67
68
+ /**
69
+ * If there is an interval scheduled to run, stop and clear internal state.
70
+ */
51
71
#clearInterval( ) {
52
72
if ( this . #intervalId) {
53
73
clearInterval ( this . #intervalId) ;
54
74
this . #intervalId = null ;
55
75
}
56
76
}
57
77
78
+ /**
79
+ * Check idf the PromiseQueue is paused.
80
+ */
58
81
public get isPaused ( ) {
59
82
return this . #isPaused;
60
83
}
61
84
85
+ /**
86
+ * Pause or stop processing the tasks.
87
+ * Does not stop any running tasks.
88
+ */
62
89
public stop ( ) {
63
90
this . #isPaused = true ;
64
91
this . #clearInterval( ) ;
65
92
}
66
93
94
+ /**
95
+ * Unpause or continue processing tasks.
96
+ */
67
97
public continue ( ) {
68
98
this . #isPaused = false ;
69
99
this . #startInterval( ) ;
70
100
}
71
101
102
+ /**
103
+ * Getter for how many jobs are currently running.
104
+ */
72
105
public get jobsRunning ( ) {
73
106
return this . #jobsRunning;
74
107
}
75
108
109
+ /**
110
+ * Called after a task is done, will try to start a new task.
111
+ */
76
112
#next( ) {
77
113
this . emit ( 'next' ) ;
78
114
this . #jobsRunning-- ;
79
115
this . #tryToStartNextJob( ) ;
80
116
}
81
117
118
+ /**
119
+ * Whether we can start a new job.
120
+ */
82
121
get #canStartJob( ) {
83
122
return this . #jobsRunning < this . concurrent ;
84
123
}
85
124
125
+ /**
126
+ * Getter for how many concurrent jobs can run.
127
+ */
86
128
public get concurrent ( ) {
87
129
return this . #allowedConcurrentJobs;
88
130
}
89
131
132
+ /**
133
+ * Setter for concurrent jobs.
134
+ */
90
135
public set concurrent ( value ) {
91
136
if ( value < 1 ) {
92
137
throw new Error ( 'Cannot have less than 1 job running.' ) ;
93
138
}
94
139
this . #allowedConcurrentJobs = value ;
95
140
}
96
141
142
+ /**
143
+ * Check if we can start a new job and start it.
144
+ */
97
145
#tryToStartNextJob( ) : boolean {
98
146
if ( this . #isPaused) {
99
147
return false ;
@@ -121,6 +169,9 @@ export default class PromiseQueue extends EventEmitter {
121
169
return false ;
122
170
}
123
171
172
+ /**
173
+ * When the signal emits an abort event we should reject with the same reason.
174
+ */
124
175
static async throwOnAbort ( signal : AbortSignal ) : Promise < never > {
125
176
return new Promise ( ( _resolve , reject ) => {
126
177
signal . addEventListener (
@@ -141,6 +192,12 @@ export default class PromiseQueue extends EventEmitter {
141
192
while ( this . #tryToStartNextJob( ) ) { }
142
193
}
143
194
195
+ /**
196
+ * Add a new task to the queue, the returned promise will resolve when the task resolves.
197
+ * @param task The underlying job to run.
198
+ * @param options.priority The task priority, the higher it is the sooner the task will run.
199
+ * @param option.signal The `AbortSignal` that can be used to abort the task from the caller.
200
+ */
144
201
async add < TaskResultType > ( task : Task < TaskResultType > , options ?: AddTaskOptions ) {
145
202
this . emit ( 'new_job' ) ;
146
203
return new Promise ( ( resolve , reject ) => {
0 commit comments