@@ -21,7 +21,7 @@ import { WriteObject, WriteFlags } from './call-interface';
21
21
import { Channel } from './channel' ;
22
22
import { ChannelOptions } from './channel-options' ;
23
23
import { CompressionAlgorithms } from './compression-algorithms' ;
24
- import { LogVerbosity } from './constants' ;
24
+ import { DEFAULT_MAX_RECEIVE_MESSAGE_LENGTH , LogVerbosity , Status } from './constants' ;
25
25
import { BaseFilter , Filter , FilterFactory } from './filter' ;
26
26
import * as logging from './logging' ;
27
27
import { Metadata , MetadataValue } from './metadata' ;
@@ -94,6 +94,10 @@ class IdentityHandler extends CompressionHandler {
94
94
}
95
95
96
96
class DeflateHandler extends CompressionHandler {
97
+ constructor ( private maxRecvMessageLength : number ) {
98
+ super ( ) ;
99
+ }
100
+
97
101
compressMessage ( message : Buffer ) {
98
102
return new Promise < Buffer > ( ( resolve , reject ) => {
99
103
zlib . deflate ( message , ( err , output ) => {
@@ -108,18 +112,34 @@ class DeflateHandler extends CompressionHandler {
108
112
109
113
decompressMessage ( message : Buffer ) {
110
114
return new Promise < Buffer > ( ( resolve , reject ) => {
111
- zlib . inflate ( message , ( err , output ) => {
112
- if ( err ) {
113
- reject ( err ) ;
114
- } else {
115
- resolve ( output ) ;
115
+ let totalLength = 0 ;
116
+ const messageParts : Buffer [ ] = [ ] ;
117
+ const decompresser = zlib . createInflate ( ) ;
118
+ decompresser . on ( 'data' , ( chunk : Buffer ) => {
119
+ messageParts . push ( chunk ) ;
120
+ totalLength += chunk . byteLength ;
121
+ if ( this . maxRecvMessageLength !== - 1 && totalLength > this . maxRecvMessageLength ) {
122
+ decompresser . destroy ( ) ;
123
+ reject ( {
124
+ code : Status . RESOURCE_EXHAUSTED ,
125
+ details : `Received message that decompresses to a size larger than ${ this . maxRecvMessageLength } `
126
+ } ) ;
116
127
}
117
128
} ) ;
129
+ decompresser . on ( 'end' , ( ) => {
130
+ resolve ( Buffer . concat ( messageParts ) ) ;
131
+ } ) ;
132
+ decompresser . write ( message ) ;
133
+ decompresser . end ( ) ;
118
134
} ) ;
119
135
}
120
136
}
121
137
122
138
class GzipHandler extends CompressionHandler {
139
+ constructor ( private maxRecvMessageLength : number ) {
140
+ super ( ) ;
141
+ }
142
+
123
143
compressMessage ( message : Buffer ) {
124
144
return new Promise < Buffer > ( ( resolve , reject ) => {
125
145
zlib . gzip ( message , ( err , output ) => {
@@ -134,13 +154,25 @@ class GzipHandler extends CompressionHandler {
134
154
135
155
decompressMessage ( message : Buffer ) {
136
156
return new Promise < Buffer > ( ( resolve , reject ) => {
137
- zlib . unzip ( message , ( err , output ) => {
138
- if ( err ) {
139
- reject ( err ) ;
140
- } else {
141
- resolve ( output ) ;
157
+ let totalLength = 0 ;
158
+ const messageParts : Buffer [ ] = [ ] ;
159
+ const decompresser = zlib . createGunzip ( ) ;
160
+ decompresser . on ( 'data' , ( chunk : Buffer ) => {
161
+ messageParts . push ( chunk ) ;
162
+ totalLength += chunk . byteLength ;
163
+ if ( this . maxRecvMessageLength !== - 1 && totalLength > this . maxRecvMessageLength ) {
164
+ decompresser . destroy ( ) ;
165
+ reject ( {
166
+ code : Status . RESOURCE_EXHAUSTED ,
167
+ details : `Received message that decompresses to a size larger than ${ this . maxRecvMessageLength } `
168
+ } ) ;
142
169
}
143
170
} ) ;
171
+ decompresser . on ( 'end' , ( ) => {
172
+ resolve ( Buffer . concat ( messageParts ) ) ;
173
+ } ) ;
174
+ decompresser . write ( message ) ;
175
+ decompresser . end ( ) ;
144
176
} ) ;
145
177
}
146
178
}
@@ -165,14 +197,14 @@ class UnknownHandler extends CompressionHandler {
165
197
}
166
198
}
167
199
168
- function getCompressionHandler ( compressionName : string ) : CompressionHandler {
200
+ function getCompressionHandler ( compressionName : string , maxReceiveMessageSize : number ) : CompressionHandler {
169
201
switch ( compressionName ) {
170
202
case 'identity' :
171
203
return new IdentityHandler ( ) ;
172
204
case 'deflate' :
173
- return new DeflateHandler ( ) ;
205
+ return new DeflateHandler ( maxReceiveMessageSize ) ;
174
206
case 'gzip' :
175
- return new GzipHandler ( ) ;
207
+ return new GzipHandler ( maxReceiveMessageSize ) ;
176
208
default :
177
209
return new UnknownHandler ( compressionName ) ;
178
210
}
@@ -182,11 +214,14 @@ export class CompressionFilter extends BaseFilter implements Filter {
182
214
private sendCompression : CompressionHandler = new IdentityHandler ( ) ;
183
215
private receiveCompression : CompressionHandler = new IdentityHandler ( ) ;
184
216
private currentCompressionAlgorithm : CompressionAlgorithm = 'identity' ;
217
+ private maxReceiveMessageLength : number ;
185
218
186
219
constructor ( channelOptions : ChannelOptions , private sharedFilterConfig : SharedCompressionFilterConfig ) {
187
220
super ( ) ;
188
221
189
- const compressionAlgorithmKey = channelOptions [ 'grpc.default_compression_algorithm' ] ;
222
+ const compressionAlgorithmKey =
223
+ channelOptions [ 'grpc.default_compression_algorithm' ] ;
224
+ this . maxReceiveMessageLength = channelOptions [ 'grpc.max_receive_message_length' ] ?? DEFAULT_MAX_RECEIVE_MESSAGE_LENGTH
190
225
if ( compressionAlgorithmKey !== undefined ) {
191
226
if ( isCompressionAlgorithmKey ( compressionAlgorithmKey ) ) {
192
227
const clientSelectedEncoding = CompressionAlgorithms [ compressionAlgorithmKey ] as CompressionAlgorithm ;
@@ -200,7 +235,10 @@ export class CompressionFilter extends BaseFilter implements Filter {
200
235
*/
201
236
if ( ! serverSupportedEncodings || serverSupportedEncodings . includes ( clientSelectedEncoding ) ) {
202
237
this . currentCompressionAlgorithm = clientSelectedEncoding ;
203
- this . sendCompression = getCompressionHandler ( this . currentCompressionAlgorithm ) ;
238
+ this . sendCompression = getCompressionHandler (
239
+ this . currentCompressionAlgorithm ,
240
+ - 1
241
+ ) ;
204
242
}
205
243
} else {
206
244
logging . log ( LogVerbosity . ERROR , `Invalid value provided for grpc.default_compression_algorithm option: ${ compressionAlgorithmKey } ` ) ;
@@ -228,7 +266,7 @@ export class CompressionFilter extends BaseFilter implements Filter {
228
266
if ( receiveEncoding . length > 0 ) {
229
267
const encoding : MetadataValue = receiveEncoding [ 0 ] ;
230
268
if ( typeof encoding === 'string' ) {
231
- this . receiveCompression = getCompressionHandler ( encoding ) ;
269
+ this . receiveCompression = getCompressionHandler ( encoding , this . maxReceiveMessageLength ) ;
232
270
}
233
271
}
234
272
metadata . remove ( 'grpc-encoding' ) ;
0 commit comments