1
1
import EventEmitter from 'events' ;
2
2
import PriorityQueue from './priority_queue' ;
3
3
4
- type TaskOptions = { signal ?: AbortSignal ; } ;
4
+ type TaskOptions = { signal ?: AbortSignal } ;
5
5
6
6
type Task < TaskResultType > =
7
- | ( ( options ?: TaskOptions ) => PromiseLike < TaskResultType > )
8
- | ( ( options ?: TaskOptions ) => TaskResultType ) ;
7
+ | ( ( options ?: TaskOptions ) => PromiseLike < TaskResultType > )
8
+ | ( ( options ?: TaskOptions ) => TaskResultType ) ;
9
9
10
10
type AddTaskOptions = {
11
11
priority ?: number ;
12
12
signal ?: AbortSignal ;
13
- }
14
-
13
+ } ;
14
+
15
15
export default class PromiseQueue extends EventEmitter {
16
16
#queue: PriorityQueue < ( ) => Promise < void > > ;
17
17
@@ -22,7 +22,7 @@ export default class PromiseQueue extends EventEmitter {
22
22
23
23
#isPaused = false ;
24
24
25
- #intervalId: ReturnType < typeof setInterval > | null = null ;
25
+ #intervalId: ReturnType < typeof setInterval > | null = null ;
26
26
27
27
constructor ( ) {
28
28
super ( ) ;
@@ -61,7 +61,7 @@ export default class PromiseQueue extends EventEmitter {
61
61
}
62
62
63
63
#next( ) {
64
- this . emit ( 'next' )
64
+ this . emit ( 'next' ) ;
65
65
this . #jobsRunning-- ;
66
66
this . #tryToStartNextJob( ) ;
67
67
}
@@ -105,47 +105,54 @@ export default class PromiseQueue extends EventEmitter {
105
105
return false ;
106
106
}
107
107
108
- static async throwOnAbort ( signal : AbortSignal ) : Promise < never > {
109
- return new Promise ( ( _resolve , reject ) => {
110
- signal . addEventListener ( 'abort' , ( ) => {
111
- reject ( signal . reason ) ;
112
- } , { once : true } ) ;
113
- } ) ;
114
- }
115
-
116
- /**
117
- * Try to start jobs until the concurrency limit is reached.
118
- */
119
- processQueue ( ) : void {
120
- while ( this . #tryToStartNextJob( ) ) { }
121
- }
108
+ static async throwOnAbort ( signal : AbortSignal ) : Promise < never > {
109
+ return new Promise ( ( _resolve , reject ) => {
110
+ signal . addEventListener (
111
+ 'abort' ,
112
+ ( ) => {
113
+ reject ( signal . reason ) ;
114
+ } ,
115
+ { once : true }
116
+ ) ;
117
+ } ) ;
118
+ }
119
+
120
+ /**
121
+ * Try to start jobs until the concurrency limit is reached.
122
+ */
123
+ processQueue ( ) : void {
124
+ // eslint-disable-next-line no-empty
125
+ while ( this . #tryToStartNextJob( ) ) { }
126
+ }
122
127
123
128
async add < TaskResultType > ( task : Task < TaskResultType > , options ?: AddTaskOptions ) {
124
129
this . emit ( 'new_job' ) ;
125
130
return new Promise ( ( resolve , reject ) => {
126
- this . #queue. push ( PriorityQueue . makeNode ( options ?. priority || 0 , async ( ) => {
127
- this . #jobsRunning++ ;
128
- try {
129
- // Throw if the operation was aborted and don't run.
130
- options ?. signal ?. throwIfAborted ( ) ;
131
-
132
- // Run the task until completion or until the task aborts.
133
- let operation = task ( { signal : options ?. signal } ) ;
134
- // If a signal was passed, we may abort the operation from the outside.
135
- // This does not abort internally the task, it has to also manage the abort signal.
136
- if ( options ?. signal ) {
137
- operation = Promise . race ( [ operation , PromiseQueue . throwOnAbort ( options . signal ) ] ) ;
131
+ this . #queue. push (
132
+ PriorityQueue . makeNode ( options ?. priority || 0 , async ( ) => {
133
+ this . #jobsRunning++ ;
134
+ try {
135
+ // Throw if the operation was aborted and don't run.
136
+ options ?. signal ?. throwIfAborted ( ) ;
137
+
138
+ // Run the task until completion or until the task aborts.
139
+ let operation = task ( { signal : options ?. signal } ) ;
140
+ // If a signal was passed, we may abort the operation from the outside.
141
+ // This does not abort internally the task, it has to also manage the abort signal.
142
+ if ( options ?. signal ) {
143
+ operation = Promise . race ( [ operation , PromiseQueue . throwOnAbort ( options . signal ) ] ) ;
144
+ }
145
+ const result = await operation ;
146
+ resolve ( result ) ;
147
+ // Completed task
148
+ } catch ( error : unknown ) {
149
+ reject ( error ) ;
150
+ } finally {
151
+ this . emit ( 'finished_job' ) ;
152
+ this . #next( ) ;
138
153
}
139
- const result = await operation ;
140
- resolve ( result ) ;
141
- // Completed task
142
- } catch ( error : unknown ) {
143
- reject ( error ) ;
144
- } finally {
145
- this . emit ( 'finished_job' ) ;
146
- this . #next( ) ;
147
- }
148
- } ) ) ;
154
+ } )
155
+ ) ;
149
156
} ) ;
150
157
}
151
158
}
0 commit comments