Skip to content

Commit 8e62222

Browse files
committed
grpc-js: Avoid buffering significantly more than max_receive_message_size per received message (1.8.x)
1 parent 9d83947 commit 8e62222

9 files changed

+186
-148
lines changed

packages/grpc-js/src/compression-filter.ts

+55-17
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,7 @@ import { WriteObject, WriteFlags } from './call-interface';
2121
import { Channel } from './channel';
2222
import { ChannelOptions } from './channel-options';
2323
import { CompressionAlgorithms } from './compression-algorithms';
24-
import { LogVerbosity } from './constants';
24+
import { DEFAULT_MAX_RECEIVE_MESSAGE_LENGTH, LogVerbosity, Status } from './constants';
2525
import { BaseFilter, Filter, FilterFactory } from './filter';
2626
import * as logging from './logging';
2727
import { Metadata, MetadataValue } from './metadata';
@@ -94,6 +94,10 @@ class IdentityHandler extends CompressionHandler {
9494
}
9595

9696
class DeflateHandler extends CompressionHandler {
97+
constructor(private maxRecvMessageLength: number) {
98+
super();
99+
}
100+
97101
compressMessage(message: Buffer) {
98102
return new Promise<Buffer>((resolve, reject) => {
99103
zlib.deflate(message, (err, output) => {
@@ -108,18 +112,34 @@ class DeflateHandler extends CompressionHandler {
108112

109113
decompressMessage(message: Buffer) {
110114
return new Promise<Buffer>((resolve, reject) => {
111-
zlib.inflate(message, (err, output) => {
112-
if (err) {
113-
reject(err);
114-
} else {
115-
resolve(output);
115+
let totalLength = 0;
116+
const messageParts: Buffer[] = [];
117+
const decompresser = zlib.createInflate();
118+
decompresser.on('data', (chunk: Buffer) => {
119+
messageParts.push(chunk);
120+
totalLength += chunk.byteLength;
121+
if (this.maxRecvMessageLength !== -1 && totalLength > this.maxRecvMessageLength) {
122+
decompresser.destroy();
123+
reject({
124+
code: Status.RESOURCE_EXHAUSTED,
125+
details: `Received message that decompresses to a size larger than ${this.maxRecvMessageLength}`
126+
});
116127
}
117128
});
129+
decompresser.on('end', () => {
130+
resolve(Buffer.concat(messageParts));
131+
});
132+
decompresser.write(message);
133+
decompresser.end();
118134
});
119135
}
120136
}
121137

122138
class GzipHandler extends CompressionHandler {
139+
constructor(private maxRecvMessageLength: number) {
140+
super();
141+
}
142+
123143
compressMessage(message: Buffer) {
124144
return new Promise<Buffer>((resolve, reject) => {
125145
zlib.gzip(message, (err, output) => {
@@ -134,13 +154,25 @@ class GzipHandler extends CompressionHandler {
134154

135155
decompressMessage(message: Buffer) {
136156
return new Promise<Buffer>((resolve, reject) => {
137-
zlib.unzip(message, (err, output) => {
138-
if (err) {
139-
reject(err);
140-
} else {
141-
resolve(output);
157+
let totalLength = 0;
158+
const messageParts: Buffer[] = [];
159+
const decompresser = zlib.createGunzip();
160+
decompresser.on('data', (chunk: Buffer) => {
161+
messageParts.push(chunk);
162+
totalLength += chunk.byteLength;
163+
if (this.maxRecvMessageLength !== -1 && totalLength > this.maxRecvMessageLength) {
164+
decompresser.destroy();
165+
reject({
166+
code: Status.RESOURCE_EXHAUSTED,
167+
details: `Received message that decompresses to a size larger than ${this.maxRecvMessageLength}`
168+
});
142169
}
143170
});
171+
decompresser.on('end', () => {
172+
resolve(Buffer.concat(messageParts));
173+
});
174+
decompresser.write(message);
175+
decompresser.end();
144176
});
145177
}
146178
}
@@ -165,14 +197,14 @@ class UnknownHandler extends CompressionHandler {
165197
}
166198
}
167199

168-
function getCompressionHandler(compressionName: string): CompressionHandler {
200+
function getCompressionHandler(compressionName: string, maxReceiveMessageSize: number): CompressionHandler {
169201
switch (compressionName) {
170202
case 'identity':
171203
return new IdentityHandler();
172204
case 'deflate':
173-
return new DeflateHandler();
205+
return new DeflateHandler(maxReceiveMessageSize);
174206
case 'gzip':
175-
return new GzipHandler();
207+
return new GzipHandler(maxReceiveMessageSize);
176208
default:
177209
return new UnknownHandler(compressionName);
178210
}
@@ -182,11 +214,14 @@ export class CompressionFilter extends BaseFilter implements Filter {
182214
private sendCompression: CompressionHandler = new IdentityHandler();
183215
private receiveCompression: CompressionHandler = new IdentityHandler();
184216
private currentCompressionAlgorithm: CompressionAlgorithm = 'identity';
217+
private maxReceiveMessageLength: number;
185218

186219
constructor(channelOptions: ChannelOptions, private sharedFilterConfig: SharedCompressionFilterConfig) {
187220
super();
188221

189-
const compressionAlgorithmKey = channelOptions['grpc.default_compression_algorithm'];
222+
const compressionAlgorithmKey =
223+
channelOptions['grpc.default_compression_algorithm'];
224+
this.maxReceiveMessageLength = channelOptions['grpc.max_receive_message_length'] ?? DEFAULT_MAX_RECEIVE_MESSAGE_LENGTH
190225
if (compressionAlgorithmKey !== undefined) {
191226
if (isCompressionAlgorithmKey(compressionAlgorithmKey)) {
192227
const clientSelectedEncoding = CompressionAlgorithms[compressionAlgorithmKey] as CompressionAlgorithm;
@@ -200,7 +235,10 @@ export class CompressionFilter extends BaseFilter implements Filter {
200235
*/
201236
if (!serverSupportedEncodings || serverSupportedEncodings.includes(clientSelectedEncoding)) {
202237
this.currentCompressionAlgorithm = clientSelectedEncoding;
203-
this.sendCompression = getCompressionHandler(this.currentCompressionAlgorithm);
238+
this.sendCompression = getCompressionHandler(
239+
this.currentCompressionAlgorithm,
240+
-1
241+
);
204242
}
205243
} else {
206244
logging.log(LogVerbosity.ERROR, `Invalid value provided for grpc.default_compression_algorithm option: ${compressionAlgorithmKey}`);
@@ -228,7 +266,7 @@ export class CompressionFilter extends BaseFilter implements Filter {
228266
if (receiveEncoding.length > 0) {
229267
const encoding: MetadataValue = receiveEncoding[0];
230268
if (typeof encoding === 'string') {
231-
this.receiveCompression = getCompressionHandler(encoding);
269+
this.receiveCompression = getCompressionHandler(encoding, this.maxReceiveMessageLength);
232270
}
233271
}
234272
metadata.remove('grpc-encoding');

packages/grpc-js/src/internal-channel.ts

-2
Original file line numberDiff line numberDiff line change
@@ -33,7 +33,6 @@ import {
3333
} from './resolver';
3434
import { trace } from './logging';
3535
import { SubchannelAddress } from './subchannel-address';
36-
import { MaxMessageSizeFilterFactory } from './max-message-size-filter';
3736
import { mapProxyName } from './http_proxy';
3837
import { GrpcUri, parseUri, uriToString } from './uri-parser';
3938
import { ServerSurfaceCall } from './server-call';
@@ -310,7 +309,6 @@ export class InternalChannel {
310309
}
311310
);
312311
this.filterStackFactory = new FilterStackFactory([
313-
new MaxMessageSizeFilterFactory(this.options),
314312
new CompressionFilterFactory(this, this.options),
315313
]);
316314
this.trace('Channel constructed with options ' + JSON.stringify(options, undefined, 2));

packages/grpc-js/src/max-message-size-filter.ts

-89
This file was deleted.

packages/grpc-js/src/server-call.ts

+56-31
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,6 @@ import { EventEmitter } from 'events';
1919
import * as http2 from 'http2';
2020
import { Duplex, Readable, Writable } from 'stream';
2121
import * as zlib from 'zlib';
22-
import { promisify } from 'util';
2322

2423
import {
2524
Status,
@@ -38,8 +37,6 @@ import { Deadline } from './deadline';
3837
import { getErrorCode, getErrorMessage } from './error';
3938

4039
const TRACER_NAME = 'server_call';
41-
const unzip = promisify(zlib.unzip);
42-
const inflate = promisify(zlib.inflate);
4340

4441
function trace(text: string): void {
4542
logging.trace(LogVerbosity.DEBUG, TRACER_NAME, text);
@@ -478,19 +475,42 @@ export class Http2ServerCallStream<
478475
private getDecompressedMessage(
479476
message: Buffer,
480477
encoding: string
481-
): Buffer | Promise<Buffer> {
482-
if (encoding === 'deflate') {
483-
return inflate(message.subarray(5));
484-
} else if (encoding === 'gzip') {
485-
return unzip(message.subarray(5));
486-
} else if (encoding === 'identity') {
487-
return message.subarray(5);
478+
): Buffer | Promise<Buffer> { const messageContents = message.subarray(5);
479+
if (encoding === 'identity') {
480+
return messageContents;
481+
} else if (encoding === 'deflate' || encoding === 'gzip') {
482+
let decompresser: zlib.Gunzip | zlib.Deflate;
483+
if (encoding === 'deflate') {
484+
decompresser = zlib.createInflate();
485+
} else {
486+
decompresser = zlib.createGunzip();
487+
}
488+
return new Promise((resolve, reject) => {
489+
let totalLength = 0
490+
const messageParts: Buffer[] = [];
491+
decompresser.on('data', (chunk: Buffer) => {
492+
messageParts.push(chunk);
493+
totalLength += chunk.byteLength;
494+
if (this.maxReceiveMessageSize !== -1 && totalLength > this.maxReceiveMessageSize) {
495+
decompresser.destroy();
496+
reject({
497+
code: Status.RESOURCE_EXHAUSTED,
498+
details: `Received message that decompresses to a size larger than ${this.maxReceiveMessageSize}`
499+
});
500+
}
501+
});
502+
decompresser.on('end', () => {
503+
resolve(Buffer.concat(messageParts));
504+
});
505+
decompresser.write(messageContents);
506+
decompresser.end();
507+
});
508+
} else {
509+
return Promise.reject({
510+
code: Status.UNIMPLEMENTED,
511+
details: `Received message compressed with unsupported encoding "${encoding}"`,
512+
});
488513
}
489-
490-
return Promise.reject({
491-
code: Status.UNIMPLEMENTED,
492-
details: `Received message compressed with unsupported encoding "${encoding}"`,
493-
});
494514
}
495515

496516
sendMetadata(customMetadata?: Metadata) {
@@ -807,7 +827,7 @@ export class Http2ServerCallStream<
807827
| ServerDuplexStream<RequestType, ResponseType>,
808828
encoding: string
809829
) {
810-
const decoder = new StreamDecoder();
830+
const decoder = new StreamDecoder(this.maxReceiveMessageSize);
811831

812832
let readsDone = false;
813833

@@ -823,29 +843,34 @@ export class Http2ServerCallStream<
823843
};
824844

825845
this.stream.on('data', async (data: Buffer) => {
826-
const messages = decoder.write(data);
846+
let messages: Buffer[];
847+
try {
848+
messages = decoder.write(data);
849+
} catch (e) {
850+
this.sendError({
851+
code: Status.RESOURCE_EXHAUSTED,
852+
details: (e as Error).message
853+
});
854+
return;
855+
}
827856

828857
pendingMessageProcessing = true;
829858
this.stream.pause();
830859
for (const message of messages) {
831-
if (
832-
this.maxReceiveMessageSize !== -1 &&
833-
message.length > this.maxReceiveMessageSize
834-
) {
835-
this.sendError({
836-
code: Status.RESOURCE_EXHAUSTED,
837-
details: `Received message larger than max (${message.length} vs. ${this.maxReceiveMessageSize})`,
838-
});
839-
return;
840-
}
841860
this.emit('receiveMessage');
842861

843862
const compressed = message.readUInt8(0) === 1;
844863
const compressedMessageEncoding = compressed ? encoding : 'identity';
845-
const decompressedMessage = await this.getDecompressedMessage(
846-
message,
847-
compressedMessageEncoding
848-
);
864+
let decompressedMessage: Buffer;
865+
try {
866+
decompressedMessage = await this.getDecompressedMessage(
867+
message,
868+
compressedMessageEncoding
869+
);
870+
} catch (e) {
871+
this.sendError(e as Partial<StatusObject>);
872+
return;
873+
}
849874

850875
// Encountered an error with decompression; it'll already have been propogated back
851876
// Just return early

0 commit comments

Comments
 (0)