@@ -3,12 +3,13 @@ const zlib = require('zlib')
3
3
4
4
// Concatenates packets into one batch packet, and adds length prefixs.
5
5
class Framer {
6
- constructor ( compressor , compressionLevel , compressionThreshold ) {
6
+ constructor ( client ) {
7
7
// Encoding
8
8
this . packets = [ ]
9
- this . compressor = compressor || 'none'
10
- this . compressionLevel = compressionLevel
11
- this . compressionThreshold = compressionThreshold
9
+ this . compressor = client . compressionAlgorithm || 'none'
10
+ this . compressionLevel = client . compressionLevel
11
+ this . compressionThreshold = client . compressionThreshold
12
+ this . writeCompressor = client . features . compressorInHeader && client . compressionReady
12
13
}
13
14
14
15
// No compression in base class
@@ -21,30 +22,45 @@ class Framer {
21
22
}
22
23
23
24
static decompress ( algorithm , buffer ) {
24
- try {
25
- switch ( algorithm ) {
26
- case 'deflate' : return zlib . inflateRawSync ( buffer , { chunkSize : 512000 } )
27
- case 'snappy' : throw Error ( 'Snappy compression not implemented' )
28
- case 'none' : return buffer
29
- default : throw Error ( 'Unknown compression type ' + this . compressor )
30
- }
31
- } catch {
32
- return buffer
25
+ switch ( algorithm ) {
26
+ case 0 :
27
+ case 'deflate' :
28
+ return zlib . inflateRawSync ( buffer , { chunkSize : 512000 } )
29
+ case 1 :
30
+ case 'snappy' :
31
+ throw Error ( 'Snappy compression not implemented' )
32
+ case 'none' :
33
+ case 255 :
34
+ return buffer
35
+ default : throw Error ( 'Unknown compression type ' + algorithm )
33
36
}
34
37
}
35
38
36
- static decode ( compressor , buf ) {
39
+ static decode ( client , buf ) {
37
40
// Read header
38
41
if ( buf [ 0 ] !== 0xfe ) throw Error ( 'bad batch packet header ' + buf [ 0 ] )
39
42
const buffer = buf . slice ( 1 )
40
- const decompressed = this . decompress ( compressor , buffer )
43
+ // Decompress
44
+ let decompressed
45
+ if ( client . features . compressorInHeader && client . compressionReady ) {
46
+ decompressed = this . decompress ( buffer [ 0 ] , buffer . slice ( 1 ) )
47
+ } else {
48
+ // On old versions, compressor is session-wide ; failing to decompress
49
+ // a packet will assume it's not compressed
50
+ try {
51
+ decompressed = this . decompress ( client . compressionAlgorithm , buffer )
52
+ } catch ( e ) {
53
+ decompressed = buffer
54
+ }
55
+ }
41
56
return Framer . getPackets ( decompressed )
42
57
}
43
58
44
59
encode ( ) {
45
60
const buf = Buffer . concat ( this . packets )
46
61
const compressed = ( buf . length > this . compressionThreshold ) ? this . compress ( buf ) : buf
47
- return Buffer . concat ( [ Buffer . from ( [ 0xfe ] ) , compressed ] )
62
+ const header = this . writeCompressor ? [ 0xfe , 0 ] : [ 0xfe ]
63
+ return Buffer . concat ( [ Buffer . from ( header ) , compressed ] )
48
64
}
49
65
50
66
addEncodedPacket ( chunk ) {
0 commit comments