Skip to content

Commit 5d9bf10

Browse files
authored
feat: emit Keepalive timeout error and speed up tests using fake timers (#1798)
* chore: speed up tests using fake timers * fix: typo * fix: improve some other tests * fix: improve flush test * fix: remove clock
1 parent e212fa7 commit 5d9bf10

File tree

3 files changed

+153
-51
lines changed

3 files changed

+153
-51
lines changed

src/lib/client.ts

+1
Original file line numberDiff line numberDiff line change
@@ -2100,6 +2100,7 @@ export default class MqttClient extends TypedEventEmitter<MqttClientEventCallbac
21002100
this._sendPacket({ cmd: 'pingreq' })
21012101
} else {
21022102
// do a forced cleanup since socket will be in bad shape
2103+
this.emit('error', new Error('Keepalive timeout'))
21032104
this.log('_checkPing :: calling _cleanUp with force true')
21042105
this._cleanUp(true)
21052106
}

test/abstract_client.ts

+112-28
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@ import { IPublishPacket, IPubrelPacket, ISubackPacket, QoS } from 'mqtt-packet'
2121
import { DoneCallback, ErrorWithReasonCode } from 'src/lib/shared'
2222
import { fail } from 'assert'
2323
import { describe, it, beforeEach, afterEach } from 'node:test'
24+
import { nextTick } from 'process'
2425

2526
/**
2627
* These tests try to be consistent with names for servers (brokers) and clients,
@@ -1309,7 +1310,7 @@ export default function abstractTest(server, config, ports) {
13091310
})
13101311
}
13111312
callback()
1312-
}, 100)
1313+
}, 10)
13131314
}
13141315

13151316
client.on('message', (topic, message, packet) => {
@@ -1342,7 +1343,7 @@ export default function abstractTest(server, config, ports) {
13421343

13431344
const qosTests = [0, 1, 2]
13441345
qosTests.forEach((qos) => {
1345-
it(`should publish 10 QoS ${qos}and receive them only when \`handleMessage\` finishes`, function _test(t, done) {
1346+
it(`should publish 10 QoS ${qos} and receive them only when \`handleMessage\` finishes`, function _test(t, done) {
13461347
testQosHandleMessage(qos, done)
13471348
})
13481349
})
@@ -2048,40 +2049,83 @@ export default function abstractTest(server, config, ports) {
20482049
})
20492050

20502051
it(
2051-
'should reconnect if pingresp is not sent',
2052+
'should reconnect on keepalive timeout',
20522053
{
2053-
timeout: 4000,
2054+
timeout: 10000,
20542055
},
20552056
function _test(t, done) {
2056-
const client = connect({ keepalive: 1, reconnectPeriod: 100 })
2057+
const clock = sinon.useFakeTimers()
20572058

2058-
// Fake no pingresp being send by stubbing the _handlePingresp function
2059-
client.on('packetreceive', (packet) => {
2060-
if (packet.cmd === 'pingresp') {
2061-
setImmediate(() => {
2062-
client.pingResp = false
2063-
})
2059+
t.after(() => {
2060+
clock.restore()
2061+
if (client) {
2062+
client.end(true)
2063+
throw new Error('Test timed out')
20642064
}
20652065
})
20662066

2067+
let client = connect({
2068+
keepalive: 60,
2069+
reconnectPeriod: 5000,
2070+
})
2071+
20672072
client.once('connect', () => {
2068-
client.once('connect', () => {
2069-
client.end(true, done)
2073+
client.pingResp = false
2074+
2075+
client.once('error', (err) => {
2076+
assert.equal(err.message, 'Keepalive timeout')
2077+
client.once('connect', () => {
2078+
client.end(true, done)
2079+
client = null
2080+
})
2081+
})
2082+
2083+
client.once('close', () => {
2084+
// Wait for the reconnect to happen
2085+
clock.tick(client.options.reconnectPeriod)
20702086
})
2087+
2088+
clock.tick(client.options.keepalive * 1000)
20712089
})
20722090
},
20732091
)
20742092

2075-
it('should not reconnect if pingresp is successful', function _test(t, done) {
2076-
const client = connect({ keepalive: 100 })
2077-
client.once('close', () => {
2078-
done(new Error('Client closed connection'))
2079-
})
2080-
setTimeout(() => {
2081-
client.removeAllListeners('close')
2082-
client.end(true, done)
2083-
}, 1000)
2084-
})
2093+
it(
2094+
'should not reconnect if pingresp is successful',
2095+
{ timeout: 1000 },
2096+
function _test(t, done) {
2097+
const clock = sinon.useFakeTimers()
2098+
2099+
t.after(() => {
2100+
clock.restore()
2101+
if (client) {
2102+
client.end(true)
2103+
}
2104+
})
2105+
2106+
let client = connect({ keepalive: 10 })
2107+
client.once('close', () => {
2108+
done(new Error('Client closed connection'))
2109+
})
2110+
2111+
client.once('connect', () => {
2112+
setImmediate(() => {
2113+
// make keepalive check trigger
2114+
clock.tick(client.options.keepalive * 1000)
2115+
})
2116+
2117+
client.on('packetsend', (packet) => {
2118+
if (packet.cmd === 'pingreq') {
2119+
client.removeAllListeners('close')
2120+
client.end(true, done)
2121+
client = null
2122+
}
2123+
})
2124+
2125+
clock.tick(1)
2126+
})
2127+
},
2128+
)
20852129

20862130
it('should defer the next ping when sending a control packet', function _test(t, done) {
20872131
const client = connect({ keepalive: 1 })
@@ -2866,13 +2910,22 @@ export default function abstractTest(server, config, ports) {
28662910
})
28672911

28682912
it('should reconnect after stream disconnect', function _test(t, done) {
2869-
const client = connect()
2913+
const clock = sinon.useFakeTimers()
2914+
2915+
t.after(() => {
2916+
clock.restore()
2917+
})
2918+
2919+
const client = connect({ reconnectPeriod: 1000 })
28702920

28712921
let tryReconnect = true
28722922

28732923
client.on('connect', () => {
28742924
if (tryReconnect) {
28752925
client.stream.end()
2926+
client.once('close', () => {
2927+
clock.tick(client.options.reconnectPeriod)
2928+
})
28762929
tryReconnect = false
28772930
} else {
28782931
client.end(true, done)
@@ -2881,7 +2934,15 @@ export default function abstractTest(server, config, ports) {
28812934
})
28822935

28832936
it("should emit 'reconnect' when reconnecting", function _test(t, done) {
2884-
const client = connect()
2937+
const clock = sinon.useFakeTimers()
2938+
2939+
t.after(() => {
2940+
clock.restore()
2941+
})
2942+
2943+
const client = connect({
2944+
reconnectPeriod: 1000,
2945+
})
28852946
let tryReconnect = true
28862947
let reconnectEvent = false
28872948

@@ -2892,6 +2953,9 @@ export default function abstractTest(server, config, ports) {
28922953
client.on('connect', () => {
28932954
if (tryReconnect) {
28942955
client.stream.end()
2956+
client.once('close', () => {
2957+
clock.tick(client.options.reconnectPeriod)
2958+
})
28952959
tryReconnect = false
28962960
} else {
28972961
assert.isTrue(reconnectEvent)
@@ -2901,7 +2965,14 @@ export default function abstractTest(server, config, ports) {
29012965
})
29022966

29032967
it("should emit 'offline' after going offline", function _test(t, done) {
2904-
const client = connect()
2968+
const clock = sinon.useFakeTimers()
2969+
2970+
t.after(() => {
2971+
clock.restore()
2972+
})
2973+
const client = connect({
2974+
reconnectPeriod: 1000,
2975+
})
29052976

29062977
let tryReconnect = true
29072978
let offlineEvent = false
@@ -2914,6 +2985,9 @@ export default function abstractTest(server, config, ports) {
29142985
if (tryReconnect) {
29152986
client.stream.end()
29162987
tryReconnect = false
2988+
client.once('close', () => {
2989+
clock.tick(client.options.reconnectPeriod)
2990+
})
29172991
} else {
29182992
assert.isTrue(offlineEvent)
29192993
client.end(true, done)
@@ -2956,18 +3030,28 @@ export default function abstractTest(server, config, ports) {
29563030
timeout: 10000,
29573031
},
29583032
function _test(t, done) {
3033+
const clock = sinon.useFakeTimers()
3034+
3035+
t.after(() => {
3036+
clock.restore()
3037+
})
3038+
29593039
let end
29603040
const reconnectSlushTime = 200
29613041
const client = connect({ reconnectPeriod: test.period })
29623042
let reconnect = false
2963-
const start = Date.now()
3043+
const start = clock.now
29643044

29653045
client.on('connect', () => {
29663046
if (!reconnect) {
29673047
client.stream.end()
3048+
client.once('close', () => {
3049+
// ensure the tick is done after the reconnect timer is setup (on close)
3050+
clock.tick(test.period)
3051+
})
29683052
reconnect = true
29693053
} else {
2970-
end = Date.now()
3054+
end = clock.now
29713055
client.end(() => {
29723056
const reconnectPeriodDuringTest = end - start
29733057
if (

test/client.ts

+40-23
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,4 @@
1+
import { useFakeTimers } from 'sinon'
12
import mqtt from '../src'
23
import { assert } from 'chai'
34
import { fork } from 'child_process'
@@ -181,34 +182,44 @@ describe('MqttClient', () => {
181182
host: 'localhost',
182183
keepalive: 1,
183184
connectTimeout: 350,
184-
reconnectPeriod: 0,
185+
reconnectPeriod: 0, // disable reconnect
185186
})
186187
client.once('connect', () => {
187188
client.publish(
188189
'fakeTopic',
189190
'fakeMessage',
190191
{ qos: 1 },
191192
(err) => {
193+
// connection closed
192194
assert.exists(err)
193195
pubCallbackCalled = true
194196
},
195197
)
196198
client.unsubscribe('fakeTopic', (err, result) => {
199+
// connection closed
197200
assert.exists(err)
198201
unsubscribeCallbackCalled = true
199202
})
200-
setTimeout(() => {
201-
client.end((err1) => {
202-
assert.strictEqual(
203-
pubCallbackCalled && unsubscribeCallbackCalled,
204-
true,
205-
'callbacks not invoked',
206-
)
207-
server2.close((err2) => {
208-
done(err1 || err2)
203+
204+
client.once('error', (err) => {
205+
assert.equal(err.message, 'Keepalive timeout')
206+
const originalFLush = client['_flush']
207+
// flush will be called on _cleanUp because of keepalive timeout
208+
client['_flush'] = function _flush() {
209+
originalFLush.call(client)
210+
client.end((err1) => {
211+
assert.strictEqual(
212+
pubCallbackCalled &&
213+
unsubscribeCallbackCalled,
214+
true,
215+
'callbacks should be invoked with error',
216+
)
217+
server2.close((err2) => {
218+
done(err1 || err2)
219+
})
209220
})
210-
})
211-
}, 5000)
221+
}
222+
})
212223
})
213224
},
214225
)
@@ -218,7 +229,7 @@ describe('MqttClient', () => {
218229
it(
219230
'should attempt to reconnect once server is down',
220231
{
221-
timeout: 30000,
232+
timeout: 5000,
222233
},
223234
function _test(t, done) {
224235
const args = ['-r', 'ts-node/register']
@@ -344,7 +355,7 @@ describe('MqttClient', () => {
344355
it(
345356
'should not keep requeueing the first message when offline',
346357
{
347-
timeout: 2500,
358+
timeout: 1000,
348359
},
349360
function _test(t, done) {
350361
const server2 = serverBuilder('mqtt').listen(ports.PORTAND45)
@@ -365,16 +376,22 @@ describe('MqttClient', () => {
365376
})
366377
})
367378

368-
setTimeout(() => {
369-
if (client.queue.length === 0) {
370-
debug('calling final client.end()')
371-
client.end(true, (err) => done(err))
372-
} else {
373-
debug('calling client.end()')
374-
// Do not call done. We want to trigger a reconnect here.
375-
client.end(true)
379+
let reconnections = 0
380+
381+
client.on('reconnect', () => {
382+
reconnections++
383+
if (reconnections === 2) {
384+
if (client.queue.length === 0) {
385+
debug('calling final client.end()')
386+
client.end(true, (err) => done(err))
387+
} else {
388+
debug('calling client.end()')
389+
// Do not call done. We want to trigger a reconnect here.
390+
client.end(true)
391+
done(Error('client queue not empty'))
392+
}
376393
}
377-
}, 2000)
394+
})
378395
},
379396
)
380397

0 commit comments

Comments
 (0)