Skip to content

Commit 403ba53

Browse files
committed
Do not stack overflow if a TCP frame contains too many PUBLISH
1 parent 800a4dd commit 403ba53

File tree

2 files changed

+58
-3
lines changed

2 files changed

+58
-3
lines changed

lib/client.js

Lines changed: 7 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -249,12 +249,16 @@ MqttClient.prototype._setupStream = function () {
249249
packets.push(packet)
250250
})
251251

252-
function process () {
252+
function nextTickWork () {
253+
process.nextTick(work)
254+
}
255+
256+
function work () {
253257
var packet = packets.shift()
254258
var done = completeParse
255259

256260
if (packet) {
257-
that._handlePacket(packet, process)
261+
that._handlePacket(packet, nextTickWork)
258262
} else {
259263
completeParse = null
260264
done()
@@ -264,7 +268,7 @@ MqttClient.prototype._setupStream = function () {
264268
writable._write = function (buf, enc, done) {
265269
completeParse = done
266270
parser.parse(buf)
267-
process()
271+
work()
268272
}
269273

270274
this.stream.pipe(writable)

test/client.js

Lines changed: 51 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,9 @@ var path = require('path')
77
var abstractClientTests = require('./abstract_client')
88
var net = require('net')
99
var eos = require('end-of-stream')
10+
var mqttPacket = require('mqtt-packet')
11+
var Buffer = require('safe-buffer').Buffer
12+
var Duplex = require('readable-stream').Duplex
1013
var Connection = require('mqtt-connection')
1114
var Server = require('./server')
1215
var port = 9876
@@ -148,6 +151,54 @@ describe('MqttClient', function () {
148151
})
149152
})
150153
})
154+
155+
it('should not go overflow if the TCP frame contains a lot of PUBLISH packets', function (done) {
156+
var parser = mqttPacket.parser()
157+
var count = 0
158+
var max = 1000
159+
var duplex = new Duplex({
160+
read: function (n) {},
161+
write: function (chunk, enc, cb) {
162+
parser.parse(chunk)
163+
cb() // nothing to do
164+
}
165+
})
166+
var client = new mqtt.MqttClient(function () {
167+
return duplex
168+
}, {})
169+
170+
client.on('message', function (t, p, packet) {
171+
if (++count === max) {
172+
done()
173+
}
174+
})
175+
176+
parser.on('packet', function (packet) {
177+
var packets = []
178+
179+
if (packet.cmd === 'connect') {
180+
duplex.push(mqttPacket.generate({
181+
cmd: 'connack',
182+
sessionPresent: false,
183+
returnCode: 0
184+
}))
185+
186+
for (var i = 0; i < max; i++) {
187+
packets.push(mqttPacket.generate({
188+
cmd: 'publish',
189+
topic: Buffer.from('hello'),
190+
payload: Buffer.from('world'),
191+
retain: false,
192+
dup: false,
193+
messageId: i + 1,
194+
qos: 1
195+
}))
196+
}
197+
198+
duplex.push(Buffer.concat(packets))
199+
}
200+
})
201+
})
151202
})
152203

153204
describe('reconnecting', function () {

0 commit comments

Comments
 (0)