3
3
const { WebsocketFrameSend } = require ( './frame' )
4
4
const { opcodes, sendHints } = require ( './constants' )
5
5
6
- /** @type {Uint8Array } */
6
+ /** @type {typeof Uint8Array } */
7
7
const FastBuffer = Buffer [ Symbol . species ]
8
8
9
- class SendQueue {
10
- #queued = new Set ( )
11
- #size = 0
9
+ /**
10
+ * @typedef {object } SendQueueNode
11
+ * @property {SendQueueNode | null } next
12
+ * @property {Promise<void> | null } promise
13
+ * @property {((...args: any[]) => any) } callback
14
+ * @property {Buffer | null } frame
15
+ */
12
16
13
- /** @type {import('net').Socket } */
17
+ class SendQueue {
18
+ /**
19
+ * @type {SendQueueNode | null }
20
+ */
21
+ #head = null
22
+ /**
23
+ * @type {SendQueueNode | null }
24
+ */
25
+ #tail = null
26
+
27
+ /**
28
+ * @type {boolean }
29
+ */
30
+ #running = false
31
+
32
+ /** @type {import('node:net').Socket } */
14
33
#socket
15
34
16
35
constructor ( socket ) {
@@ -19,66 +38,85 @@ class SendQueue {
19
38
20
39
add ( item , cb , hint ) {
21
40
if ( hint !== sendHints . blob ) {
22
- const data = clone ( item , hint )
23
-
24
- if ( this . #size === 0 ) {
25
- this . #dispatch ( data , cb , hint )
41
+ const frame = createFrame ( item , hint )
42
+ if ( ! this . #running ) {
43
+ // fast-path
44
+ this . #socket . write ( frame , cb )
26
45
} else {
27
- this . #queued. add ( [ data , cb , true , hint ] )
28
- this . #size++
29
-
30
- this . #run( )
46
+ /** @type {SendQueueNode } */
47
+ const node = {
48
+ next : null ,
49
+ promise : null ,
50
+ callback : cb ,
51
+ frame
52
+ }
53
+ if ( this . #tail !== null ) {
54
+ this . #tail. next = node
55
+ }
56
+ this . #tail = node
31
57
}
32
-
33
58
return
34
59
}
35
60
36
- const promise = item . arrayBuffer ( )
37
- const queue = [ null , cb , false , hint ]
38
- promise . then ( ( ab ) => {
39
- queue [ 0 ] = clone ( ab , hint )
40
- queue [ 2 ] = true
41
-
42
- this . #run( )
43
- } )
44
-
45
- this . #queued. add ( queue )
46
- this . #size++
47
- }
48
-
49
- #run ( ) {
50
- for ( const queued of this . #queued) {
51
- const [ data , cb , done , hint ] = queued
61
+ /** @type {SendQueueNode } */
62
+ const node = {
63
+ next : null ,
64
+ promise : item . arrayBuffer ( ) . then ( ( ab ) => {
65
+ node . promise = null
66
+ node . frame = createFrame ( ab , hint )
67
+ } ) ,
68
+ callback : cb ,
69
+ frame : null
70
+ }
52
71
53
- if ( ! done ) return
72
+ if ( this . #tail === null ) {
73
+ this . #tail = node
74
+ }
54
75
55
- this . #queued. delete ( queued )
56
- this . #size--
76
+ if ( this . #head === null ) {
77
+ this . #head = node
78
+ }
57
79
58
- this . #dispatch( data , cb , hint )
80
+ if ( ! this . #running) {
81
+ this . #run( )
59
82
}
60
83
}
61
84
62
- #dispatch ( data , cb , hint ) {
63
- const frame = new WebsocketFrameSend ( )
64
- const opcode = hint === sendHints . string ? opcodes . TEXT : opcodes . BINARY
65
-
66
- frame . frameData = data
67
- const buffer = frame . createFrame ( opcode )
68
-
69
- this . #socket. write ( buffer , cb )
85
+ async #run ( ) {
86
+ this . #running = true
87
+ /** @type {SendQueueNode | null } */
88
+ let node = this . #head
89
+ while ( node !== null ) {
90
+ // wait pending promise
91
+ if ( node . promise !== null ) {
92
+ await node . promise
93
+ }
94
+ // write
95
+ this . #socket. write ( node . frame , node . callback )
96
+ // cleanup
97
+ node . callback = node . frame = null
98
+ // set next
99
+ node = node . next
100
+ }
101
+ this . #head = null
102
+ this . #tail = null
103
+ this . #running = false
70
104
}
71
105
}
72
106
73
- function clone ( data , hint ) {
107
+ function createFrame ( data , hint ) {
108
+ return new WebsocketFrameSend ( toBuffer ( data , hint ) ) . createFrame ( hint === sendHints . string ? opcodes . TEXT : opcodes . BINARY )
109
+ }
110
+
111
+ function toBuffer ( data , hint ) {
74
112
switch ( hint ) {
75
113
case sendHints . string :
76
114
return Buffer . from ( data )
77
115
case sendHints . arrayBuffer :
78
116
case sendHints . blob :
79
117
return new FastBuffer ( data )
80
118
case sendHints . typedArray :
81
- return Buffer . copyBytesFrom ( data )
119
+ return new FastBuffer ( data . buffer , data . byteOffset , data . byteLength )
82
120
}
83
121
}
84
122
0 commit comments